Coverage for src/comments_moderation/utils.py: 98%
135 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-03 12:11 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-03 12:11 +0000
1from itertools import chain
3from django.contrib import messages
4from django.contrib.auth.models import User
5from django.db.models import Q
6from django.http import HttpRequest
7from django.urls import reverse
9from comments_api.constants import PARAM_ACTION
10from comments_api.constants import PARAM_ACTION_CREATE
11from comments_api.constants import PARAM_COMMENT
12from comments_api.constants import PARAM_MODERATOR
13from comments_api.constants import STATUS_SUBMITTED
14from comments_views.core.rights import AbstractUserRights
15from comments_views.core.utils import comments_credentials
16from comments_views.core.utils import comments_server_url
17from comments_views.core.utils import format_comment
18from comments_views.core.utils import get_comment
19from comments_views.core.utils import get_user_dict
20from comments_views.core.utils import make_api_request
21from ptf.model_helpers import get_article_by_doi
22from ptf.models import Collection
23from ptf.utils import send_email_from_template
24from ptf_tools.models import Invitation
25from ptf_tools.models import InvitationExtraData
27from .rights import ModeratorUserRights
30def is_comment_moderator(user: User) -> bool:
31 """Whether the user is a comment moderator."""
32 return hasattr(user, "comment_moderator") and user.comment_moderator.is_moderator
35def update_moderation_right(
36 comment_id: int,
37 moderators: int | str,
38 rights: AbstractUserRights,
39 action: str = PARAM_ACTION_CREATE,
40 request_for_message: HttpRequest | None = None,
41) -> tuple[bool, dict]:
42 """
43 Makes a POST request to the comment server to create a new moderation entry or
44 delete an existing one.
45 """
46 post_data = {PARAM_COMMENT: comment_id, PARAM_MODERATOR: moderators}
47 query_params = rights.comment_rights_query_params()
48 query_params[PARAM_ACTION] = action
49 return make_api_request(
50 "POST",
51 comments_server_url(query_params, "moderators"),
52 request_for_message=request_for_message,
53 auth=comments_credentials(),
54 json=post_data,
55 timeout=4,
56 )
59def get_all_moderators(rights: AbstractUserRights, request: HttpRequest):
60 """
61 Gets all the existing moderators available to the given rights.
63 Returns:
64 - all_moderators: The queryset of all available moderators.
65 - moderation_rights: The moderation rights from the comment server.
66 """
67 query_params = rights.comment_rights_query_params()
69 error, moderation_rights = make_api_request(
70 "GET",
71 comments_server_url(query_params, "moderators"),
72 request_for_message=request,
73 auth=comments_credentials(),
74 )
76 base_moderators = []
77 if error:
78 moderation_rights = []
79 else:
80 # mod_right struct: {"moderator_id": x, "comment_id": x, "comment__status": x}
81 base_moderators = [m["moderator_id"] for m in moderation_rights]
83 user_collections = rights.get_user_admin_collections() + rights.get_user_staff_collections()
85 # Remove duplicates
86 user_collections = set(user_collections)
87 base_moderators = set(base_moderators)
89 moderator_filter = Q(comment_moderator__is_moderator=True) & (
90 Q(comment_moderator__collections__pid__in=user_collections) | Q(pk__in=base_moderators)
91 )
93 all_moderators = (
94 User.objects.prefetch_related("comment_moderator", "comment_moderator__collections")
95 .filter(moderator_filter)
96 .distinct()
97 .order_by("first_name", "pk")
98 )
100 return all_moderators, moderation_rights
103def update_moderator_collections(
104 rights: AbstractUserRights,
105 moderator: User,
106 collections_to_add: list[str] = [],
107 collections_to_delete: list[str] = [],
108):
109 """
110 Updates a moderator's attached collections.
111 The modified collections are limited to the rights's admin collections.
113 Params:
114 - rights: the rights of the user updating the moderator's
115 collections
116 - moderator: the moderator
117 - collections_to_add: the PIDs of the collections to add
118 - collections_to_delete: the PIDs of the collections to delete
120 """
121 if not is_comment_moderator(moderator) or not rights.is_admin_moderator():
122 return
124 user_collections = rights.get_user_admin_collections()
125 collections = Collection.objects.all().values("pk", "pid")
126 collections_to_delete = [
127 c["pk"]
128 for c in collections
129 if (c["pid"] in collections_to_delete and c["pid"] in user_collections)
130 ]
131 if collections_to_delete:
132 moderator.comment_moderator.collections.remove(*collections_to_delete)
134 collections_to_add = [
135 c["pk"]
136 for c in collections
137 if (c["pid"] in collections_to_add and c["pid"] in user_collections)
138 ]
139 if collections_to_add:
140 moderator.comment_moderator.collections.add(*collections_to_add)
143def merge_dict(existing: dict, update: dict):
144 """
145 Recursively merges 2 dicts by creating potential missing keys.
146 When a key is common between the 2 dicts:
147 - If the value is a dict: recursive call to merge_dict
148 - If the value is a list: the 2 lists are concatenated
149 TODO: what if list of dict/list ?
150 - If the value is something else: the value of the update overwrites the
151 existing one.
152 """
153 for k, update_v in update.items():
154 existing_v = existing.get(k)
156 if isinstance(update_v, dict):
157 if not existing_v:
158 existing_v = {}
159 elif existing_v and not isinstance(existing_v, dict):
160 raise TypeError("Cannot merge dict and non-dict objects.")
161 existing[k] = merge_dict(existing_v, update_v) if existing_v else update_v
163 elif isinstance(update_v, list):
164 if not existing_v:
165 existing_v = []
166 elif existing_v and not isinstance(existing_v, list):
167 raise TypeError("Cannot merge list and non-list objects.")
168 existing[k] = existing_v + update_v
170 else:
171 if isinstance(existing_v, dict):
172 raise TypeError("Cannot merge dict and non-dict objects.")
173 if isinstance(existing_v, list):
174 raise TypeError("Cannot merge list and non-list objects.")
175 existing[k] = update_v
177 return existing
180def get_comment_and_can_manage_moderators(
181 request: HttpRequest, rights: AbstractUserRights, comment_id: int
182) -> tuple[bool, dict]:
183 """
184 GET the requested comment and check the given rights allow moderator management.
185 Adds a message to the request if an error occurs of if the rights are insufficient.
186 """
187 error, comment = get_comment(
188 rights.comment_rights_query_params(), comment_id, request_for_message=request
189 )
190 if error:
191 return True, {}
193 users = get_user_dict()
195 format_comment(comment, rights, users)
196 if not rights.comment_can_manage_moderators(comment) or comment["status"] != STATUS_SUBMITTED:
197 messages.error(
198 request,
199 "Error: You don't have enough rights to manage the moderators of "
200 "the given comment.",
201 )
202 return True, {}
204 return False, comment
207def email_moderator_assigned(request: HttpRequest, moderator: User, comment: dict):
208 """
209 Send the "Comment moderation request" mail to the provided moderator.
210 Add messages to the provided request.
211 """
212 messages.success(
213 request,
214 f"The moderator {moderator.first_name} {moderator.last_name} "
215 "has been assigned to the comment.",
216 )
217 # Send an e-mail to the moderator assigned to the comment -
218 # TODO: Maybe this should be done few times a day by a cron
219 # to avoid too many e-mails?
220 article = get_article_by_doi(comment["doi"])
221 if not article:
222 messages.warning(
223 request,
224 "The comment's article was not found on Trammel. "
225 + "No e-mail was sent to the moderator.",
226 )
227 return
229 article_title = article.title_tex if article else ""
230 context_data = {
231 "full_name": f"{moderator.first_name} {moderator.last_name}",
232 "inviter_name": f"{request.user.first_name} {request.user.last_name}",
233 "article_url": comment.get("base_url"),
234 "article_title": article_title,
235 "comment_dashboard_url": request.build_absolute_uri(reverse("comment_list")),
236 "email_signature": "The editorial team",
237 }
238 subject = f"[{comment['site_name'].upper()}] Comment moderation request"
239 send_email_from_template(
240 "mail/comment_moderator_assigned.html",
241 context_data,
242 subject,
243 to=[moderator.email],
244 from_collection=comment["site_name"],
245 )
248def get_comments_for_home(user: User) -> tuple[bool, dict]:
249 """
250 Query the comments server to get the summary of the comments data per collection.
252 Returns:
253 - `error` Whether the HTTP query was successful
254 - `data` The processed comments summary:
255 `
256 {
257 "COLID": submitted_comments_nb
258 }
259 `
261 There is no entry for a collection without any comments.
262 """
263 rights = ModeratorUserRights(user)
264 query_params = rights.comment_rights_query_params()
266 error, comments_summary = make_api_request(
267 "GET", comments_server_url(query_params, "comments-summary"), auth=comments_credentials()
268 )
269 if error:
270 return True, {}
272 data = {}
273 for col_id, col_data in comments_summary.items():
274 data[col_id] = col_data.get(STATUS_SUBMITTED, 0)
276 return False, data
279def get_pending_invitations(rights: AbstractUserRights) -> list:
280 """
281 Returns the list of pending invitations to comment moderators available to the
282 given user rights.
283 """
284 pending_invites = Invitation.objects.filter(accepted=False, extra_data__has_key="moderator")
285 invites = []
286 for invite in pending_invites:
287 collections_to_add = []
288 extra_data = InvitationExtraData(**invite.extra_data)
290 # Get all collections matching the user_collections
291 if rights.user.is_superuser: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 collections_to_add = chain.from_iterable(
293 c.pid for c in extra_data.moderator.collections
294 )
295 elif rights.is_admin_moderator():
296 collections = chain.from_iterable(c.pid for c in extra_data.moderator.collections)
297 collections_to_add = [
298 c for c in collections if c in rights.get_user_admin_collections()
299 ]
301 # Get all comments whose collection matches the user_collections
302 comments_to_add = []
303 user_collections = (
304 rights.get_user_admin_collections() or rights.get_user_staff_collections()
305 )
306 for comment in extra_data.moderator.comments:
307 if rights.user.is_superuser: 307 ↛ 308line 307 didn't jump to line 308 because the condition on line 307 was never true
308 comments_to_add.append(comment.id)
309 elif comment.pid in user_collections:
310 comments_to_add.append(comment.id)
312 if collections_to_add or comments_to_add:
313 invites.append(
314 {
315 "name": f"{invite.first_name} {invite.last_name}",
316 "email": invite.email,
317 "sent": invite.sent,
318 "date_expired": invite.date_expired(),
319 "key_expired": invite.key_expired(),
320 "collections": collections_to_add,
321 "comments": comments_to_add,
322 }
323 )
325 return invites
328def get_comment_pending_invitations(comment_id: int) -> list:
329 """
330 Returns the invitations linked to the given comment ID.
331 """
332 # Not working on CI with SQLite DB backend.
333 # return Invitation.objects.filter(
334 # accepted=False,
335 # extra_data__moderator__comments__contains=[{"id": comment_id}]
336 # )
337 base_invites = Invitation.objects.filter(
338 accepted=False, extra_data__moderator__has_key="comments"
339 )
340 invites = []
341 for invite in base_invites:
342 extra_data = InvitationExtraData(**invite.extra_data)
343 if any(c.id == comment_id for c in extra_data.moderator.comments):
344 invites.append(invite)
346 return invites