Coverage for src/comments_moderation/utils.py: 98%
125 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-09 14:54 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-09 14:54 +0000
1from itertools import chain
3from comments_api.constants import (
4 PARAM_ACTION,
5 PARAM_ACTION_CREATE,
6 PARAM_COMMENT,
7 PARAM_MODERATOR,
8 STATUS_SUBMITTED,
9)
10from comments_views.core.rights import AbstractUserRights
11from comments_views.core.utils import (
12 comments_credentials,
13 comments_server_url,
14 format_comment,
15 get_comment,
16 get_user_dict,
17 make_api_request,
18)
19from django.contrib import messages
20from django.contrib.auth.models import User
21from django.db.models import Q
22from django.http import HttpRequest
23from django.urls import reverse
24from ptf.model_helpers import get_article_by_doi
25from ptf.models import Collection
26from ptf.utils import send_email_from_template
28from ptf_tools.models import Invitation, InvitationExtraData
30from .rights import ModeratorUserRights
33def is_comment_moderator(user: User) -> bool:
34 """Whether the user is a comment moderator."""
35 return hasattr(user, "comment_moderator") and user.comment_moderator.is_moderator
38def update_moderation_right(
39 comment_id: int,
40 moderators: int | str,
41 rights: AbstractUserRights,
42 action: str = PARAM_ACTION_CREATE,
43 request_for_message: HttpRequest | None = None,
44) -> tuple[bool, dict]:
45 """
46 Makes a POST request to the comment server to create a new moderation entry or
47 delete an existing one.
48 """
49 post_data = {PARAM_COMMENT: comment_id, PARAM_MODERATOR: moderators}
50 query_params = rights.comment_rights_query_params()
51 query_params[PARAM_ACTION] = action
52 return make_api_request(
53 "POST",
54 comments_server_url(query_params, "moderators"),
55 request_for_message=request_for_message,
56 auth=comments_credentials(),
57 json=post_data,
58 timeout=4,
59 )
62def get_all_moderators(rights: AbstractUserRights, request: HttpRequest):
63 """
64 Gets all the existing moderators available to the given rights.
66 Returns:
67 - all_moderators: The queryset of all available moderators.
68 - moderation_rights: The moderation rights from the comment server.
69 """
70 query_params = rights.comment_rights_query_params()
72 error, moderation_rights = make_api_request(
73 "GET",
74 comments_server_url(query_params, "moderators"),
75 request_for_message=request,
76 auth=comments_credentials(),
77 )
79 base_moderators = []
80 if error:
81 moderation_rights = []
82 else:
83 # mod_right struct: {"moderator_id": x, "comment_id": x, "comment__status": x}
84 base_moderators = [m["moderator_id"] for m in moderation_rights]
86 user_collections = rights.get_user_admin_collections() + rights.get_user_staff_collections()
88 # Remove duplicates
89 user_collections = set(user_collections)
90 base_moderators = set(base_moderators)
92 moderator_filter = Q(comment_moderator__is_moderator=True) & (
93 Q(comment_moderator__collections__pid__in=user_collections) | Q(pk__in=base_moderators)
94 )
96 all_moderators = (
97 User.objects.prefetch_related("comment_moderator", "comment_moderator__collections")
98 .filter(moderator_filter)
99 .distinct()
100 .order_by("first_name", "pk")
101 )
103 return all_moderators, moderation_rights
106def update_moderator_collections(
107 rights: AbstractUserRights,
108 moderator: User,
109 collections_to_add: list[str] = [],
110 collections_to_delete: list[str] = [],
111):
112 """
113 Updates a moderator's attached collections.
114 The modified collections are limited to the rights's admin collections.
116 Params:
117 - rights: the rights of the user updating the moderator's
118 collections
119 - moderator: the moderator
120 - collections_to_add: the PIDs of the collections to add
121 - collections_to_delete: the PIDs of the collections to delete
123 """
124 if not is_comment_moderator(moderator) or not rights.is_admin_moderator():
125 return
127 user_collections = rights.get_user_admin_collections()
128 collections = Collection.objects.all().values("pk", "pid")
129 collections_to_delete = [
130 c["pk"]
131 for c in collections
132 if (c["pid"] in collections_to_delete and c["pid"] in user_collections)
133 ]
134 if collections_to_delete:
135 moderator.comment_moderator.collections.remove(*collections_to_delete)
137 collections_to_add = [
138 c["pk"]
139 for c in collections
140 if (c["pid"] in collections_to_add and c["pid"] in user_collections)
141 ]
142 if collections_to_add:
143 moderator.comment_moderator.collections.add(*collections_to_add)
146def merge_dict(existing: dict, update: dict):
147 """
148 Recursively merges 2 dicts by creating potential missing keys.
149 When a key is common between the 2 dicts:
150 - If the value is a dict: recursive call to merge_dict
151 - If the value is a list: the 2 lists are concatenated
152 TODO: what if list of dict/list ?
153 - If the value is something else: the value of the update overwrites the
154 existing one.
155 """
156 for k, update_v in update.items():
157 existing_v = existing.get(k)
159 if isinstance(update_v, dict):
160 if not existing_v:
161 existing_v = {}
162 elif existing_v and not isinstance(existing_v, dict):
163 raise TypeError("Cannot merge dict and non-dict objects.")
164 existing[k] = merge_dict(existing_v, update_v) if existing_v else update_v
166 elif isinstance(update_v, list):
167 if not existing_v:
168 existing_v = []
169 elif existing_v and not isinstance(existing_v, list):
170 raise TypeError("Cannot merge list and non-list objects.")
171 existing[k] = existing_v + update_v
173 else:
174 if isinstance(existing_v, dict):
175 raise TypeError("Cannot merge dict and non-dict objects.")
176 if isinstance(existing_v, list):
177 raise TypeError("Cannot merge list and non-list objects.")
178 existing[k] = update_v
180 return existing
183def get_comment_and_can_manage_moderators(
184 request: HttpRequest, rights: AbstractUserRights, comment_id: int
185) -> tuple[bool, dict]:
186 """
187 GET the requested comment and check the given rights allow moderator management.
188 Adds a message to the request if an error occurs of if the rights are insufficient.
189 """
190 error, comment = get_comment(
191 rights.comment_rights_query_params(), comment_id, request_for_message=request
192 )
193 if error:
194 return True, {}
196 users = get_user_dict()
198 format_comment(comment, rights, users)
199 if not rights.comment_can_manage_moderators(comment) or comment["status"] != STATUS_SUBMITTED:
200 messages.error(
201 request,
202 "Error: You don't have enough rights to manage the moderators of the given comment.",
203 )
204 return True, {}
206 return False, comment
209def email_moderator_assigned(request: HttpRequest, moderator: User, comment: dict):
210 """
211 Send the "Comment moderation request" mail to the provided moderator.
212 Add messages to the provided request.
213 """
214 messages.success(
215 request,
216 f"The moderator {moderator.first_name} {moderator.last_name} "
217 "has been assigned to the comment.",
218 )
219 # Send an e-mail to the moderator assigned to the comment -
220 # TODO: Maybe this should be done few times a day by a cron
221 # to avoid too many e-mails?
222 article = get_article_by_doi(comment["doi"])
223 if not article:
224 messages.warning(
225 request,
226 "The comment's article was not found on Trammel. "
227 + "No e-mail was sent to the moderator.",
228 )
229 return
231 article_title = article.title_tex if article else ""
232 context_data = {
233 "full_name": f"{moderator.first_name} {moderator.last_name}",
234 "inviter_name": f"{request.user.first_name} {request.user.last_name}",
235 "article_url": comment.get("base_url"),
236 "article_title": article_title,
237 "comment_dashboard_url": request.build_absolute_uri(reverse("comment_list")),
238 "email_signature": "The editorial team",
239 }
240 subject = f"[{comment['site_name'].upper()}] Comment moderation request"
241 send_email_from_template(
242 "mail/comment_moderator_assigned.html",
243 context_data,
244 subject,
245 to=[moderator.email],
246 from_collection=comment["site_name"],
247 )
250def get_comments_for_home(user: User) -> tuple[bool, dict]:
251 """
252 Query the comments server to get the summary of the comments data per collection.
254 Returns:
255 - `error` Whether the HTTP query was successful
256 - `data` The processed comments summary:
257 `
258 {
259 "COLID": submitted_comments_nb
260 }
261 `
263 There is no entry for a collection without any comments.
264 """
265 rights = ModeratorUserRights(user)
266 query_params = rights.comment_rights_query_params()
268 error, comments_summary = make_api_request(
269 "GET", comments_server_url(query_params, "comments-summary"), auth=comments_credentials()
270 )
271 if error:
272 return True, {}
274 data = {}
275 for col_id, col_data in comments_summary.items():
276 data[col_id] = col_data.get(STATUS_SUBMITTED, 0)
278 return False, data
281def get_pending_invitations(rights: AbstractUserRights) -> list:
282 """
283 Returns the list of pending invitations to comment moderators available to the
284 given user rights.
285 """
286 pending_invites = Invitation.objects.filter(accepted=False, extra_data__has_key="moderator")
287 invites = []
288 for invite in pending_invites:
289 collections_to_add = []
290 extra_data = InvitationExtraData(**invite.extra_data)
292 # Get all collections matching the user_collections
293 if rights.user.is_superuser: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true
294 collections_to_add = chain.from_iterable(
295 c.pid for c in extra_data.moderator.collections
296 )
297 elif rights.is_admin_moderator():
298 collections = chain.from_iterable(c.pid for c in extra_data.moderator.collections)
299 collections_to_add = [
300 c for c in collections if c in rights.get_user_admin_collections()
301 ]
303 # Get all comments whose collection matches the user_collections
304 comments_to_add = []
305 user_collections = (
306 rights.get_user_admin_collections() or rights.get_user_staff_collections()
307 )
308 for comment in extra_data.moderator.comments:
309 if rights.user.is_superuser: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 comments_to_add.append(comment.id)
311 elif comment.pid in user_collections:
312 comments_to_add.append(comment.id)
314 if collections_to_add or comments_to_add:
315 invites.append(
316 {
317 "name": f"{invite.first_name} {invite.last_name}",
318 "email": invite.email,
319 "sent": invite.sent,
320 "date_expired": invite.date_expired(),
321 "key_expired": invite.key_expired(),
322 "collections": collections_to_add,
323 "comments": comments_to_add,
324 }
325 )
327 return invites
330def get_comment_pending_invitations(comment_id: int) -> list:
331 """
332 Returns the invitations linked to the given comment ID.
333 """
334 # Not working on CI with SQLite DB backend.
335 # return Invitation.objects.filter(
336 # accepted=False,
337 # extra_data__moderator__comments__contains=[{"id": comment_id}]
338 # )
339 base_invites = Invitation.objects.filter(
340 accepted=False, extra_data__moderator__has_key="comments"
341 )
342 invites = []
343 for invite in base_invites:
344 extra_data = InvitationExtraData(**invite.extra_data)
345 if any(c.id == comment_id for c in extra_data.moderator.comments):
346 invites.append(invite)
348 return invites