Coverage for src / ptf_tools / tasks.py: 40%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.2, created at 2026-02-19 12:55 +0000

1import logging 

2import traceback 

3 

4from celery import shared_task 

5from celery.utils.functional import chunks 

6from django.conf import settings 

7from ptf.cmds.ptf_cmds import ( 

8 archiveNumdamResourcePtfCmd, 

9 get_numdam_collection_list, 

10 get_numdam_issues_list, 

11) 

12from ptf.models.classes.collection import Collection 

13from task.custom_task import TaskWithProgress 

14from task.tasks.archiving_tasks import ( 

15 _archive_collection_common, 

16 _archive_collections_common, 

17 _execute_tasks_in_chunks, 

18 check_nfs_directories, 

19) 

20 

21from history.model_data import HistoryEventDict, HistoryEventStatus 

22from history.views import insert_history_event 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27@shared_task( 

28 name="ptf_tools.tasks.archiving_tasks.archive_numdam_collections", 

29 queue="coordinator", 

30 bind=True, 

31 base=TaskWithProgress, 

32) 

33def archive_numdam_collections(self: "TaskWithProgress"): 

34 colids = get_numdam_collection_list() 

35 colids = [c for c in colids if c not in settings.MERSENNE_COLLECTIONS] 

36 _archive_collections_common(self, colids, archive_col_task=archive_numdam_collection) 

37 

38 

39@shared_task( 

40 name="ptf_tools.tasks.archiving_tasks.archive_numdam_collection", 

41 queue="coordinator", 

42 bind=True, 

43 base=TaskWithProgress, 

44) 

45def archive_numdam_collection( 

46 self: "TaskWithProgress", 

47 colid: str, 

48 username: str | None, 

49 mathdoc_archive: str = settings.MATHDOC_ARCHIVE_FOLDER, 

50 binary_files_folder: str | None = None, 

51 xml_only=False, 

52 needs_publication_date=False, # Unused but needed because Celery apply_async checks args 

53): 

54 check_nfs_directories( 

55 [ 

56 settings.NUMDAM_ISSUE_SRC_FOLDER, 

57 settings.NUMDAM_ARTICLE_SRC_FOLDER, 

58 settings.CEDRAM_TEX_FOLDER, 

59 ] 

60 ) 

61 

62 pids = sorted(get_numdam_issues_list(colid)) 

63 

64 task_id = self.request.id 

65 if not task_id: 

66 raise ValueError("Couldn't find current task id") 

67 

68 arg_chunks = chunks(((colid, pid) for pid in pids), 10) 

69 _execute_tasks_in_chunks( 

70 self, mathdoc_archive, colid, xml_only, arg_chunks, archive_numdam_resource 

71 ) 

72 

73 

74@shared_task( 

75 name="ptf_tools.tasks.archiving_tasks.archive_numdam_resource", 

76 queue="executor", 

77) 

78def archive_numdam_resource(colid, pid=None): 

79 params = {"colid": colid} 

80 if pid: 

81 params["pid"] = pid 

82 cmd = archiveNumdamResourcePtfCmd(params) 

83 cmd.do() 

84 

85 

86@shared_task( 

87 name="ptf_tools.tasks.archiving_tasks.archive_trammel_collection", 

88 queue="coordinator", 

89 bind=True, 

90 base=TaskWithProgress, 

91) 

92def archive_trammel_collection( 

93 self: "TaskWithProgress", 

94 colid: str, 

95 username: str | None = None, 

96 mathdoc_archive: str = settings.MATHDOC_ARCHIVE_FOLDER, 

97 binary_files_folder: str | None = None, 

98 xml_only=False, 

99 needs_publication_date=False, # Unused but needed because Celery apply_async checks args 

100): 

101 collection = Collection.objects.get(pid=colid) 

102 title = collection.title_html if collection is not None else "" 

103 event_dict: "HistoryEventDict" = { 

104 "type": "archive", 

105 "pid": f"archive-trammel-{colid}", 

106 "col": collection, 

107 "title": title, 

108 "status": HistoryEventStatus.PENDING, 

109 } 

110 

111 try: 

112 check_nfs_directories( 

113 [ 

114 settings.NUMDAM_ISSUE_SRC_FOLDER, 

115 settings.NUMDAM_ARTICLE_SRC_FOLDER, 

116 settings.CEDRAM_TEX_FOLDER, 

117 ] 

118 ) 

119 

120 _archive_collection_common( 

121 self, 

122 collection, 

123 binary_files_folder, 

124 mathdoc_archive, 

125 xml_only, 

126 batch_size=1, 

127 needs_publication_date=True, 

128 ) 

129 event_dict["status"] = HistoryEventStatus.OK 

130 except Exception: 

131 event_dict["status"] = HistoryEventStatus.ERROR 

132 event_dict["message"] = traceback.format_exc() 

133 logger.error(event_dict["message"]) 

134 raise 

135 # if isinstance(e, ExceptionGroup): 

136 # messages: "list[HistoryChildDict]" = [] 

137 # for exception in e.exceptions: 

138 # messages.append( 

139 # { 

140 # "type": "archive_resource_error", 

141 # "status": HistoryEventStatus.ERROR, 

142 # "message": "".join( 

143 # traceback.format_exception( 

144 # type(exception), exception, exception.__traceback__ 

145 # ) 

146 # ), 

147 # } 

148 # ) 

149 # event_dict["children"] = messages 

150 finally: 

151 insert_history_event(event_dict) 

152 

153 

154@shared_task( 

155 name="ptf_tools.tasks.archiving_tasks.archive_trammel_collections", 

156 queue="coordinator", 

157 bind=True, 

158 base=TaskWithProgress, 

159) 

160def archive_trammel_collections( 

161 self: "TaskWithProgress", 

162 *args, 

163 **kwargs, 

164): 

165 _archive_collections_common( 

166 self, 

167 *args, 

168 archive_col_task=archive_trammel_collection, 

169 **kwargs, 

170 )