Coverage for src/ptf_tools/tasks.py: 32%

66 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2026-01-21 15:26 +0000

1import logging 

2import traceback 

3 

4from celery import group, shared_task 

5from celery.result import AsyncResult 

6from django.conf import settings 

7from ptf.cmds.ptf_cmds import ( 

8 archiveNumdamResourcePtfCmd, 

9 get_numdam_collection_list, 

10 get_numdam_issues_list, 

11) 

12from ptf.models.classes.collection import Collection 

13from task.custom_task import TaskWithProgress 

14from task.tasks import increment_progress 

15from task.tasks.archiving_tasks import ( 

16 _archive_collection_common, 

17 _archive_collections_common, 

18 check_nfs_directories, 

19) 

20 

21from history.model_data import HistoryEventDict, HistoryEventStatus 

22from history.views import insert_history_event 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27@shared_task( 

28 name="ptf_tools.tasks.archiving_tasks.archive_numdam_collections", 

29 queue="coordinator", 

30 bind=True, 

31 base=TaskWithProgress, 

32) 

33def archive_numdam_collections(self: "TaskWithProgress"): 

34 colids = get_numdam_collection_list() 

35 colids = [c for c in colids if c not in settings.MERSENNE_COLLECTIONS] 

36 _archive_collections_common(self, colids, archive_col_task=archive_numdam_collection) 

37 

38 

39@shared_task( 

40 name="ptf_tools.tasks.archiving_tasks.archive_numdam_collection", 

41 queue="coordinator", 

42 bind=True, 

43 base=TaskWithProgress, 

44) 

45def archive_numdam_collection( 

46 self: "TaskWithProgress", 

47 colid: str, 

48 username: str | None, 

49 mathdoc_archive: str = settings.MATHDOC_ARCHIVE_FOLDER, 

50 binary_files_folder: str | None = None, 

51 xml_only=False, 

52): 

53 event_dict: "HistoryEventDict" = { 

54 "type": "archive", 

55 "col": None, 

56 "status": HistoryEventStatus.ERROR, 

57 "pid": colid, 

58 } 

59 try: 

60 collection = Collection.objects.get(pid=colid) 

61 event_dict["col"] = collection 

62 check_nfs_directories( 

63 [ 

64 settings.NUMDAM_ISSUE_SRC_FOLDER, 

65 settings.NUMDAM_ARTICLE_SRC_FOLDER, 

66 settings.CEDRAM_TEX_FOLDER, 

67 ] 

68 ) 

69 

70 pids = sorted(get_numdam_issues_list(colid)) 

71 

72 task_id = self.request.id 

73 if not task_id: 

74 raise ValueError("Couldn't find current task id") 

75 

76 signatures = ( 

77 archive_numdam_resource.chunks([(colid, pid) for pid in pids], 10) 

78 .set(queue="coordinator") 

79 .link(increment_progress.si(task_id)) 

80 ) 

81 promise: "AsyncResult" = group(signatures, archive_numdam_resource.si(colid))() 

82 

83 results = promise.get(disable_sync_subtasks=False, propagate=False) 

84 

85 exceptions = [result for result in results if isinstance(result, Exception)] 

86 if len(exceptions) > 0: 

87 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions) 

88 except Exception: 

89 event_dict["status"] = HistoryEventStatus.ERROR 

90 event_dict["message"] = traceback.format_exc() 

91 logger.error(event_dict["message"]) 

92 raise 

93 # if isinstance(e, ExceptionGroup): 

94 # messages: "list[HistoryChildDict]" = [] 

95 # for exception in e.exceptions: 

96 # messages.append( 

97 # { 

98 # "type": "archive_resource_error", 

99 # "status": HistoryEventStatus.ERROR, 

100 # "message": "".join( 

101 # traceback.format_exception( 

102 # type(exception), exception, exception.__traceback__ 

103 # ) 

104 # ), 

105 # } 

106 # ) 

107 # event_dict["children"] = messages 

108 finally: 

109 insert_history_event(event_dict) 

110 

111 

112@shared_task( 

113 name="ptf_tools.tasks.archiving_tasks.archive_numdam_resource", 

114 queue="executor", 

115 bind=True, 

116) 

117def archive_numdam_resource(colid, pid=None): 

118 params = {"colid": colid} 

119 if pid: 

120 params["pid"] = pid 

121 cmd = archiveNumdamResourcePtfCmd(params) 

122 cmd.do() 

123 

124 

125@shared_task( 

126 name="ptf_tools.tasks.archiving_tasks.archive_trammel_collection", 

127 queue="coordinator", 

128 bind=True, 

129 base=TaskWithProgress, 

130) 

131def archive_trammel_collection( 

132 self: "TaskWithProgress", 

133 colid: str, 

134 username: str | None = None, 

135 mathdoc_archive: str = settings.MATHDOC_ARCHIVE_FOLDER, 

136 binary_files_folder: str | None = None, 

137 xml_only=False, 

138): 

139 collection = Collection.objects.get(pid=colid) 

140 title = collection.title_html if collection is not None else "" 

141 event_dict: "HistoryEventDict" = { 

142 "type": "archive", 

143 "pid": f"archive-trammel-{colid}", 

144 "col": collection, 

145 "title": title, 

146 "status": HistoryEventStatus.PENDING, 

147 } 

148 

149 try: 

150 check_nfs_directories( 

151 [ 

152 settings.NUMDAM_ISSUE_SRC_FOLDER, 

153 settings.NUMDAM_ARTICLE_SRC_FOLDER, 

154 settings.CEDRAM_TEX_FOLDER, 

155 ] 

156 ) 

157 

158 _archive_collection_common( 

159 self, 

160 collection, 

161 binary_files_folder, 

162 mathdoc_archive, 

163 xml_only, 

164 batch_size=1, 

165 needs_publication_date=True, 

166 ) 

167 event_dict["status"] = HistoryEventStatus.OK 

168 except Exception: 

169 event_dict["status"] = HistoryEventStatus.ERROR 

170 event_dict["message"] = traceback.format_exc() 

171 logger.error(event_dict["message"]) 

172 raise 

173 # if isinstance(e, ExceptionGroup): 

174 # messages: "list[HistoryChildDict]" = [] 

175 # for exception in e.exceptions: 

176 # messages.append( 

177 # { 

178 # "type": "archive_resource_error", 

179 # "status": HistoryEventStatus.ERROR, 

180 # "message": "".join( 

181 # traceback.format_exception( 

182 # type(exception), exception, exception.__traceback__ 

183 # ) 

184 # ), 

185 # } 

186 # ) 

187 # event_dict["children"] = messages 

188 finally: 

189 insert_history_event(event_dict) 

190 

191 

192@shared_task( 

193 name="ptf_tools.tasks.archiving_tasks.archive_trammel_collections", 

194 queue="coordinator", 

195 bind=True, 

196 base=TaskWithProgress, 

197) 

198def archive_trammel_collections( 

199 self: "TaskWithProgress", 

200 *args, 

201 **kwargs, 

202): 

203 _archive_collections_common( 

204 self, 

205 *args, 

206 archive_col_task=archive_trammel_collection, 

207 **kwargs, 

208 )