Coverage for src/comments_moderation/utils.py: 98%

125 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-04-09 14:54 +0000

1from itertools import chain 

2 

3from comments_api.constants import ( 

4 PARAM_ACTION, 

5 PARAM_ACTION_CREATE, 

6 PARAM_COMMENT, 

7 PARAM_MODERATOR, 

8 STATUS_SUBMITTED, 

9) 

10from comments_views.core.rights import AbstractUserRights 

11from comments_views.core.utils import ( 

12 comments_credentials, 

13 comments_server_url, 

14 format_comment, 

15 get_comment, 

16 get_user_dict, 

17 make_api_request, 

18) 

19from django.contrib import messages 

20from django.contrib.auth.models import User 

21from django.db.models import Q 

22from django.http import HttpRequest 

23from django.urls import reverse 

24from ptf.model_helpers import get_article_by_doi 

25from ptf.models import Collection 

26from ptf.utils import send_email_from_template 

27 

28from ptf_tools.models import Invitation, InvitationExtraData 

29 

30from .rights import ModeratorUserRights 

31 

32 

33def is_comment_moderator(user: User) -> bool: 

34 """Whether the user is a comment moderator.""" 

35 return hasattr(user, "comment_moderator") and user.comment_moderator.is_moderator 

36 

37 

38def update_moderation_right( 

39 comment_id: int, 

40 moderators: int | str, 

41 rights: AbstractUserRights, 

42 action: str = PARAM_ACTION_CREATE, 

43 request_for_message: HttpRequest | None = None, 

44) -> tuple[bool, dict]: 

45 """ 

46 Makes a POST request to the comment server to create a new moderation entry or 

47 delete an existing one. 

48 """ 

49 post_data = {PARAM_COMMENT: comment_id, PARAM_MODERATOR: moderators} 

50 query_params = rights.comment_rights_query_params() 

51 query_params[PARAM_ACTION] = action 

52 return make_api_request( 

53 "POST", 

54 comments_server_url(query_params, "moderators"), 

55 request_for_message=request_for_message, 

56 auth=comments_credentials(), 

57 json=post_data, 

58 timeout=4, 

59 ) 

60 

61 

62def get_all_moderators(rights: AbstractUserRights, request: HttpRequest): 

63 """ 

64 Gets all the existing moderators available to the given rights. 

65 

66 Returns: 

67 - all_moderators: The queryset of all available moderators. 

68 - moderation_rights: The moderation rights from the comment server. 

69 """ 

70 query_params = rights.comment_rights_query_params() 

71 

72 error, moderation_rights = make_api_request( 

73 "GET", 

74 comments_server_url(query_params, "moderators"), 

75 request_for_message=request, 

76 auth=comments_credentials(), 

77 ) 

78 

79 base_moderators = [] 

80 if error: 

81 moderation_rights = [] 

82 else: 

83 # mod_right struct: {"moderator_id": x, "comment_id": x, "comment__status": x} 

84 base_moderators = [m["moderator_id"] for m in moderation_rights] 

85 

86 user_collections = rights.get_user_admin_collections() + rights.get_user_staff_collections() 

87 

88 # Remove duplicates 

89 user_collections = set(user_collections) 

90 base_moderators = set(base_moderators) 

91 

92 moderator_filter = Q(comment_moderator__is_moderator=True) & ( 

93 Q(comment_moderator__collections__pid__in=user_collections) | Q(pk__in=base_moderators) 

94 ) 

95 

96 all_moderators = ( 

97 User.objects.prefetch_related("comment_moderator", "comment_moderator__collections") 

98 .filter(moderator_filter) 

99 .distinct() 

100 .order_by("first_name", "pk") 

101 ) 

102 

103 return all_moderators, moderation_rights 

104 

105 

106def update_moderator_collections( 

107 rights: AbstractUserRights, 

108 moderator: User, 

109 collections_to_add: list[str] = [], 

110 collections_to_delete: list[str] = [], 

111): 

112 """ 

113 Updates a moderator's attached collections. 

114 The modified collections are limited to the rights's admin collections. 

115 

116 Params: 

117 - rights: the rights of the user updating the moderator's 

118 collections 

119 - moderator: the moderator 

120 - collections_to_add: the PIDs of the collections to add 

121 - collections_to_delete: the PIDs of the collections to delete 

122 

123 """ 

124 if not is_comment_moderator(moderator) or not rights.is_admin_moderator(): 

125 return 

126 

127 user_collections = rights.get_user_admin_collections() 

128 collections = Collection.objects.all().values("pk", "pid") 

129 collections_to_delete = [ 

130 c["pk"] 

131 for c in collections 

132 if (c["pid"] in collections_to_delete and c["pid"] in user_collections) 

133 ] 

134 if collections_to_delete: 

135 moderator.comment_moderator.collections.remove(*collections_to_delete) 

136 

137 collections_to_add = [ 

138 c["pk"] 

139 for c in collections 

140 if (c["pid"] in collections_to_add and c["pid"] in user_collections) 

141 ] 

142 if collections_to_add: 

143 moderator.comment_moderator.collections.add(*collections_to_add) 

144 

145 

146def merge_dict(existing: dict, update: dict): 

147 """ 

148 Recursively merges 2 dicts by creating potential missing keys. 

149 When a key is common between the 2 dicts: 

150 - If the value is a dict: recursive call to merge_dict 

151 - If the value is a list: the 2 lists are concatenated 

152 TODO: what if list of dict/list ? 

153 - If the value is something else: the value of the update overwrites the 

154 existing one. 

155 """ 

156 for k, update_v in update.items(): 

157 existing_v = existing.get(k) 

158 

159 if isinstance(update_v, dict): 

160 if not existing_v: 

161 existing_v = {} 

162 elif existing_v and not isinstance(existing_v, dict): 

163 raise TypeError("Cannot merge dict and non-dict objects.") 

164 existing[k] = merge_dict(existing_v, update_v) if existing_v else update_v 

165 

166 elif isinstance(update_v, list): 

167 if not existing_v: 

168 existing_v = [] 

169 elif existing_v and not isinstance(existing_v, list): 

170 raise TypeError("Cannot merge list and non-list objects.") 

171 existing[k] = existing_v + update_v 

172 

173 else: 

174 if isinstance(existing_v, dict): 

175 raise TypeError("Cannot merge dict and non-dict objects.") 

176 if isinstance(existing_v, list): 

177 raise TypeError("Cannot merge list and non-list objects.") 

178 existing[k] = update_v 

179 

180 return existing 

181 

182 

183def get_comment_and_can_manage_moderators( 

184 request: HttpRequest, rights: AbstractUserRights, comment_id: int 

185) -> tuple[bool, dict]: 

186 """ 

187 GET the requested comment and check the given rights allow moderator management. 

188 Adds a message to the request if an error occurs of if the rights are insufficient. 

189 """ 

190 error, comment = get_comment( 

191 rights.comment_rights_query_params(), comment_id, request_for_message=request 

192 ) 

193 if error: 

194 return True, {} 

195 

196 users = get_user_dict() 

197 

198 format_comment(comment, rights, users) 

199 if not rights.comment_can_manage_moderators(comment) or comment["status"] != STATUS_SUBMITTED: 

200 messages.error( 

201 request, 

202 "Error: You don't have enough rights to manage the moderators of the given comment.", 

203 ) 

204 return True, {} 

205 

206 return False, comment 

207 

208 

209def email_moderator_assigned(request: HttpRequest, moderator: User, comment: dict): 

210 """ 

211 Send the "Comment moderation request" mail to the provided moderator. 

212 Add messages to the provided request. 

213 """ 

214 messages.success( 

215 request, 

216 f"The moderator {moderator.first_name} {moderator.last_name} " 

217 "has been assigned to the comment.", 

218 ) 

219 # Send an e-mail to the moderator assigned to the comment - 

220 # TODO: Maybe this should be done few times a day by a cron 

221 # to avoid too many e-mails? 

222 article = get_article_by_doi(comment["doi"]) 

223 if not article: 

224 messages.warning( 

225 request, 

226 "The comment's article was not found on Trammel. " 

227 + "No e-mail was sent to the moderator.", 

228 ) 

229 return 

230 

231 article_title = article.title_tex if article else "" 

232 context_data = { 

233 "full_name": f"{moderator.first_name} {moderator.last_name}", 

234 "inviter_name": f"{request.user.first_name} {request.user.last_name}", 

235 "article_url": comment.get("base_url"), 

236 "article_title": article_title, 

237 "comment_dashboard_url": request.build_absolute_uri(reverse("comment_list")), 

238 "email_signature": "The editorial team", 

239 } 

240 subject = f"[{comment['site_name'].upper()}] Comment moderation request" 

241 send_email_from_template( 

242 "mail/comment_moderator_assigned.html", 

243 context_data, 

244 subject, 

245 to=[moderator.email], 

246 from_collection=comment["site_name"], 

247 ) 

248 

249 

250def get_comments_for_home(user: User) -> tuple[bool, dict]: 

251 """ 

252 Query the comments server to get the summary of the comments data per collection. 

253 

254 Returns: 

255 - `error` Whether the HTTP query was successful 

256 - `data` The processed comments summary: 

257 ` 

258 { 

259 "COLID": submitted_comments_nb 

260 } 

261 ` 

262 

263 There is no entry for a collection without any comments. 

264 """ 

265 rights = ModeratorUserRights(user) 

266 query_params = rights.comment_rights_query_params() 

267 

268 error, comments_summary = make_api_request( 

269 "GET", comments_server_url(query_params, "comments-summary"), auth=comments_credentials() 

270 ) 

271 if error: 

272 return True, {} 

273 

274 data = {} 

275 for col_id, col_data in comments_summary.items(): 

276 data[col_id] = col_data.get(STATUS_SUBMITTED, 0) 

277 

278 return False, data 

279 

280 

281def get_pending_invitations(rights: AbstractUserRights) -> list: 

282 """ 

283 Returns the list of pending invitations to comment moderators available to the 

284 given user rights. 

285 """ 

286 pending_invites = Invitation.objects.filter(accepted=False, extra_data__has_key="moderator") 

287 invites = [] 

288 for invite in pending_invites: 

289 collections_to_add = [] 

290 extra_data = InvitationExtraData(**invite.extra_data) 

291 

292 # Get all collections matching the user_collections 

293 if rights.user.is_superuser: 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true

294 collections_to_add = chain.from_iterable( 

295 c.pid for c in extra_data.moderator.collections 

296 ) 

297 elif rights.is_admin_moderator(): 

298 collections = chain.from_iterable(c.pid for c in extra_data.moderator.collections) 

299 collections_to_add = [ 

300 c for c in collections if c in rights.get_user_admin_collections() 

301 ] 

302 

303 # Get all comments whose collection matches the user_collections 

304 comments_to_add = [] 

305 user_collections = ( 

306 rights.get_user_admin_collections() or rights.get_user_staff_collections() 

307 ) 

308 for comment in extra_data.moderator.comments: 

309 if rights.user.is_superuser: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 comments_to_add.append(comment.id) 

311 elif comment.pid in user_collections: 

312 comments_to_add.append(comment.id) 

313 

314 if collections_to_add or comments_to_add: 

315 invites.append( 

316 { 

317 "name": f"{invite.first_name} {invite.last_name}", 

318 "email": invite.email, 

319 "sent": invite.sent, 

320 "date_expired": invite.date_expired(), 

321 "key_expired": invite.key_expired(), 

322 "collections": collections_to_add, 

323 "comments": comments_to_add, 

324 } 

325 ) 

326 

327 return invites 

328 

329 

330def get_comment_pending_invitations(comment_id: int) -> list: 

331 """ 

332 Returns the invitations linked to the given comment ID. 

333 """ 

334 # Not working on CI with SQLite DB backend. 

335 # return Invitation.objects.filter( 

336 # accepted=False, 

337 # extra_data__moderator__comments__contains=[{"id": comment_id}] 

338 # ) 

339 base_invites = Invitation.objects.filter( 

340 accepted=False, extra_data__moderator__has_key="comments" 

341 ) 

342 invites = [] 

343 for invite in base_invites: 

344 extra_data = InvitationExtraData(**invite.extra_data) 

345 if any(c.id == comment_id for c in extra_data.moderator.comments): 

346 invites.append(invite) 

347 

348 return invites