Coverage for src/ptf_tools/tasks.py: 47%

53 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-10 13:49 +0000

1import subprocess 

2 

3from celery import shared_task 

4from django.conf import settings 

5from ptf.cmds.ptf_cmds import ( 

6 archiveNumdamResourcePtfCmd, 

7 get_numdam_issues_list, 

8) 

9from ptf.models.classes.collection import Collection 

10from task.custom_task import CustomTask 

11from task.runner import with_progress 

12from task.tasks.archiving_tasks import ( 

13 ArchiveCollectionTask, 

14) 

15 

16from history.models import HistoryEvent 

17from history.views import insert_history_event, manage_exceptions 

18 

19 

20class ArchiveNumdamIssuesTask(CustomTask): 

21 def __init__(self, colid, pids): 

22 self.colid = colid 

23 self.pids = pids 

24 

25 def _make_subtasks(self): 

26 return [ArchiveNumdamIssueTask(self.colid, pid) for pid in self.pids] 

27 

28 

29class ArchiveNumdamIssueTask(CustomTask): 

30 """ 

31 Archive the files of an issue. Get the list of files from numdam.org 

32 """ 

33 

34 def __init__(self, colid, pid): 

35 self.colid = colid 

36 self.pid = pid 

37 

38 def do(self): 

39 print("1 task (issue)") 

40 archiveNumdamResourcePtfCmd({"colid": self.colid, "pid": self.pid}).do() 

41 

42 def on_error(self, error: Exception): 

43 collection = Collection.objects.get(pid=self.colid) 

44 manage_exceptions( 

45 { 

46 "type": "archive", 

47 "status": HistoryEvent.EventStatusEnum.ERROR, 

48 "col": collection, 

49 "pid": self.pid, 

50 }, 

51 error, 

52 ) 

53 return False 

54 

55 

56class ArchiveNumdamCollectionTask(CustomTask): 

57 """ 

58 Archive the files related to a collection (top level only, does not archive files of the issues) 

59 => col.xml and the collection images 

60 """ 

61 

62 def do(self, colid: str): 

63 # if colid in settings.MERSENNE_COLLECTIONS: 

64 # return 

65 

66 self.colid = colid 

67 subprocess.check_call(["test", "-d", settings.NUMDAM_ISSUE_SRC_FOLDER], timeout=0.5) 

68 subprocess.check_call(["test", "-d", settings.NUMDAM_ARTICLE_SRC_FOLDER], timeout=0.5) 

69 subprocess.check_call(["test", "-d", settings.CEDRAM_TEX_FOLDER], timeout=0.5) 

70 

71 archiveNumdamResourcePtfCmd({"colid": colid}).do() 

72 

73 pids = sorted(get_numdam_issues_list(colid)) 

74 return colid, pids 

75 

76 def then(self): 

77 collection = Collection.objects.get(pid=self.colid) 

78 insert_history_event( 

79 { 

80 "type": "archive", 

81 "pid": self.colid, 

82 "col": collection, 

83 "status": HistoryEvent.EventStatusEnum.OK, 

84 "data": {}, 

85 } 

86 ) 

87 

88 def _make_subtasks(self): 

89 return [ArchiveNumdamIssuesTask, self.then] 

90 

91 def on_error(self, error: Exception): 

92 if not hasattr(self, "colid"): 

93 return False 

94 collection = Collection.objects.get(pid=self.colid) 

95 manage_exceptions( 

96 { 

97 "type": "archive", 

98 "status": HistoryEvent.EventStatusEnum.ERROR, 

99 "col": collection, 

100 "pid": self.colid, 

101 }, 

102 error, 

103 ) 

104 return False 

105 

106 

107class ArchiveTrammelCollectionTask(ArchiveCollectionTask): 

108 def check_nfs_directories(self, directories: list[str]): 

109 return super().check_nfs_directories( 

110 [ 

111 settings.NUMDAM_ISSUE_SRC_FOLDER, 

112 settings.NUMDAM_ARTICLE_SRC_FOLDER, 

113 settings.CEDRAM_TEX_FOLDER, 

114 ] 

115 ) 

116 

117 

118@shared_task(bind=True) 

119def archive_trammel_collection(self, *args, **kwargs): 

120 return with_progress(ArchiveTrammelCollectionTask, self, *args, **kwargs)