Coverage for src/ptf_tools/views/archive.py: 27%

32 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-23 12:08 +0000

1import os 

2 

3from django.conf import settings 

4from django.http import Http404 

5from django.urls import reverse 

6from django.views.generic import RedirectView 

7from django_celery_results.models import TaskResult 

8from task.views import get_messages_in_task_queue 

9 

10from ptf_tools.tasks import archive_trammel_collection 

11 

12 

13class TrammelArchiveView(RedirectView): 

14 @staticmethod 

15 def reset_task_results(): 

16 TaskResult.objects.all().delete() 

17 

18 def get_redirect_url(self, *args, **kwargs): 

19 self.colid = kwargs["colid"] 

20 self.mathdoc_archive = settings.MATHDOC_ARCHIVE_FOLDER 

21 self.binary_files_folder = settings.MERSENNE_PROD_DATA_FOLDER 

22 # Make sure archiving is not already running 

23 if not get_messages_in_task_queue(): 

24 self.reset_task_results() 

25 if "progress/" in self.colid: 

26 self.colid = self.colid.replace("progress/", "") 

27 if "/progress" in self.colid: 

28 self.colid = self.colid.replace("/progress", "") 

29 

30 if self.colid != "ALL" and self.colid not in settings.MERSENNE_COLLECTIONS: 

31 raise Http404 

32 

33 colids = [self.colid] if self.colid != "ALL" else settings.MERSENNE_COLLECTIONS 

34 

35 with open( 

36 os.path.join(settings.LOG_DIR, "archive.log"), "w", encoding="utf-8" 

37 ) as file_: 

38 file_.write("Archive " + " ".join([colid for colid in colids]) + "\n") 

39 

40 for colid in colids: 

41 archive_trammel_collection.delay( 

42 colid, 

43 self.mathdoc_archive, 

44 self.binary_files_folder, 

45 ) 

46 if self.colid == "ALL": 

47 return reverse("home") 

48 else: 

49 return reverse("collection-detail", kwargs={"pid": self.colid})