Coverage for src/ptf_tools/tasks.py: 32%
66 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-12-09 15:04 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-12-09 15:04 +0000
1import logging
2import traceback
4from celery import group, shared_task
5from celery.result import AsyncResult
6from django.conf import settings
7from ptf.cmds.ptf_cmds import (
8 archiveNumdamResourcePtfCmd,
9 get_numdam_collection_list,
10 get_numdam_issues_list,
11)
12from ptf.models.classes.collection import Collection
13from task.custom_task import TaskWithProgress
14from task.tasks import increment_progress
15from task.tasks.archiving_tasks import (
16 _archive_collection_common,
17 _archive_collections_common,
18 check_nfs_directories,
19)
21from history.model_data import HistoryEventDict, HistoryEventStatus
22from history.views import insert_history_event
24logger = logging.getLogger(__name__)
27@shared_task(
28 name="ptf_tools.tasks.archiving_tasks.archive_numdam_collections",
29 queue="coordinator",
30 bind=True,
31 base=TaskWithProgress,
32)
33def archive_numdam_collections(self: "TaskWithProgress"):
34 colids = get_numdam_collection_list()
35 colids = [c for c in colids if c not in settings.MERSENNE_COLLECTIONS]
36 _archive_collections_common(self, colids, archive_col_task=archive_numdam_collection)
39@shared_task(
40 name="ptf_tools.tasks.archiving_tasks.archive_numdam_collection",
41 queue="coordinator",
42 bind=True,
43 base=TaskWithProgress,
44)
45def archive_numdam_collection(
46 self: "TaskWithProgress",
47 colid: str,
48 username: str | None,
49 mathdoc_archive: str = settings.MATHDOC_ARCHIVE_FOLDER,
50 binary_files_folder: str | None = None,
51 xml_only=False,
52):
53 event_dict: "HistoryEventDict" = {
54 "type": "archive",
55 "col": None,
56 "status": HistoryEventStatus.ERROR,
57 "pid": colid,
58 }
59 try:
60 collection = Collection.objects.get(pid=colid)
61 event_dict["col"] = collection
62 check_nfs_directories(
63 [
64 settings.NUMDAM_ISSUE_SRC_FOLDER,
65 settings.NUMDAM_ARTICLE_SRC_FOLDER,
66 settings.CEDRAM_TEX_FOLDER,
67 ]
68 )
70 pids = sorted(get_numdam_issues_list(colid))
72 task_id = self.request.id
73 if not task_id:
74 raise ValueError("Couldn't find current task id")
76 signatures = (
77 archive_numdam_resource.chunks([(colid, pid) for pid in pids], 10)
78 .set(queue="coordinator")
79 .link(increment_progress.si(task_id))
80 )
81 promise: "AsyncResult" = group(signatures, archive_numdam_resource.si(colid))()
83 results = promise.get(disable_sync_subtasks=False, propagate=False)
85 exceptions = [result for result in results if isinstance(result, Exception)]
86 if len(exceptions) > 0:
87 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions)
88 except Exception:
89 event_dict["status"] = HistoryEventStatus.ERROR
90 event_dict["message"] = traceback.format_exc()
91 logger.error(event_dict["message"])
92 raise
93 # if isinstance(e, ExceptionGroup):
94 # messages: "list[HistoryChildDict]" = []
95 # for exception in e.exceptions:
96 # messages.append(
97 # {
98 # "type": "archive_resource_error",
99 # "status": HistoryEventStatus.ERROR,
100 # "message": "".join(
101 # traceback.format_exception(
102 # type(exception), exception, exception.__traceback__
103 # )
104 # ),
105 # }
106 # )
107 # event_dict["children"] = messages
108 finally:
109 insert_history_event(event_dict)
112@shared_task(
113 name="ptf_tools.tasks.archiving_tasks.archive_numdam_resource",
114 queue="executor",
115 bind=True,
116)
117def archive_numdam_resource(colid, pid=None):
118 params = {"colid": colid}
119 if pid:
120 params["pid"] = pid
121 cmd = archiveNumdamResourcePtfCmd(params)
122 cmd.do()
125@shared_task(
126 name="ptf_tools.tasks.archiving_tasks.archive_trammel_collection",
127 queue="coordinator",
128 bind=True,
129 base=TaskWithProgress,
130)
131def archive_trammel_collection(
132 self: "TaskWithProgress",
133 colid: str,
134 username: str | None = None,
135 mathdoc_archive: str = settings.MATHDOC_ARCHIVE_FOLDER,
136 binary_files_folder: str | None = None,
137 xml_only=False,
138):
139 collection = Collection.objects.get(pid=colid)
140 title = collection.title_html if collection is not None else ""
141 event_dict: "HistoryEventDict" = {
142 "type": "archive",
143 "pid": f"archive-trammel-{colid}",
144 "col": collection,
145 "title": title,
146 "status": HistoryEventStatus.PENDING,
147 }
149 try:
150 check_nfs_directories(
151 [
152 settings.NUMDAM_ISSUE_SRC_FOLDER,
153 settings.NUMDAM_ARTICLE_SRC_FOLDER,
154 settings.CEDRAM_TEX_FOLDER,
155 ]
156 )
158 _archive_collection_common(
159 self, collection, binary_files_folder, mathdoc_archive, xml_only, batch_size=1
160 )
161 event_dict["status"] = HistoryEventStatus.OK
162 except Exception:
163 event_dict["status"] = HistoryEventStatus.ERROR
164 event_dict["message"] = traceback.format_exc()
165 logger.error(event_dict["message"])
166 raise
167 # if isinstance(e, ExceptionGroup):
168 # messages: "list[HistoryChildDict]" = []
169 # for exception in e.exceptions:
170 # messages.append(
171 # {
172 # "type": "archive_resource_error",
173 # "status": HistoryEventStatus.ERROR,
174 # "message": "".join(
175 # traceback.format_exception(
176 # type(exception), exception, exception.__traceback__
177 # )
178 # ),
179 # }
180 # )
181 # event_dict["children"] = messages
182 finally:
183 insert_history_event(event_dict)
186@shared_task(
187 name="ptf_tools.tasks.archiving_tasks.archive_trammel_collections",
188 queue="coordinator",
189 bind=True,
190 base=TaskWithProgress,
191)
192def archive_trammel_collections(
193 self: "TaskWithProgress",
194 *args,
195 **kwargs,
196):
197 _archive_collections_common(
198 self,
199 *args,
200 archive_col_task=archive_trammel_collection,
201 **kwargs,
202 )