From 07c4c47beadae42b955070c9007ae52c070eefab Mon Sep 17 00:00:00 2001 From: chase mateusiak Date: Tue, 19 Dec 2023 09:40:49 -0600 Subject: [PATCH] adding RankResponse model --- config/settings/base.py | 8 + docs/diagram_exclude_models.txt | 2 + local.yml | 1 + production.yml | 1 + yeastregulatorydb/conftest.py | 39 +++- .../api/filters/RankResponseFilter.py | 37 ++++ .../serializers/PromoterSetSigSerializer.py | 6 +- .../api/serializers/RankResponseSerializer.py | 14 ++ .../api/serializers/__init__.py | 3 +- .../api/views/BindingViewSet.py | 30 ++- .../api/views/ExpressionViewSet.py | 28 ++- .../api/views/PromoterSetViewSet.py | 25 ++- .../api/views/RankResponseViewSet.py | 57 ++++++ .../regulatory_data/models/FileFormat.py | 14 ++ .../regulatory_data/models/PromoterSetSig.py | 2 +- .../regulatory_data/models/RankResponse.py | 87 +++++++++ .../regulatory_data/models/__init__.py | 2 + .../regulatory_data/tasks/__init__.py | 2 + .../chipexo_pugh_allevents_promoter_sig.py | 105 ---------- .../tasks/promoter_significance_task.py | 166 ++++++++++++++++ .../tasks/rank_response_task.py | 180 ++++++++++++++++++ .../chipexo/28366_yiming_promoter_sig.csv.gz | Bin 0 -> 221 bytes .../regulatory_data/tests/test_tasks.py | 60 +++++- 23 files changed, 739 insertions(+), 130 deletions(-) create mode 100644 docs/diagram_exclude_models.txt create mode 100644 yeastregulatorydb/regulatory_data/api/filters/RankResponseFilter.py create mode 100644 yeastregulatorydb/regulatory_data/api/serializers/RankResponseSerializer.py create mode 100644 yeastregulatorydb/regulatory_data/api/views/RankResponseViewSet.py create mode 100644 yeastregulatorydb/regulatory_data/models/RankResponse.py delete mode 100644 yeastregulatorydb/regulatory_data/tasks/chipexo_pugh_allevents_promoter_sig.py create mode 100644 yeastregulatorydb/regulatory_data/tasks/promoter_significance_task.py create mode 100644 yeastregulatorydb/regulatory_data/tasks/rank_response_task.py create mode 100644 yeastregulatorydb/regulatory_data/tests/test_data/binding/chipexo/28366_yiming_promoter_sig.csv.gz diff --git a/config/settings/base.py b/config/settings/base.py index 3fcc227..6e65cee 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -342,3 +342,11 @@ "CHR_FORMAT", default="ucsc", ) +CHIPEXO_PROMOTER_SIG_FORMAT = env( + "CHIPEXO_PROMOTER_SIG_FORMAT", + default="chipexo_promoter_sig", +) +CALLINGCARDS_PROMOTER_SIG_FORMAT = env( + "CALLING_CARD_PROMOTER_SIG_FORMAT", + default="callingcards_promoter_sig", +) diff --git a/docs/diagram_exclude_models.txt b/docs/diagram_exclude_models.txt new file mode 100644 index 0000000..a25cebe --- /dev/null +++ b/docs/diagram_exclude_models.txt @@ -0,0 +1,2 @@ +User +BaseModel diff --git a/local.yml b/local.yml index 09c5d75..762dc26 100644 --- a/local.yml +++ b/local.yml @@ -19,6 +19,7 @@ services: env_file: - ./.envs/.local/.django - ./.envs/.local/.postgres + - ./.envs/.local/.regulatory_data ports: - '8000:8000' command: /start diff --git a/production.yml b/production.yml index 32c5860..e86fb08 100644 --- a/production.yml +++ b/production.yml @@ -18,6 +18,7 @@ services: env_file: - ./.envs/.production/.django - ./.envs/.production/.postgres + - ./.envs/.production/.regulatory_data command: /start postgres: diff --git a/yeastregulatorydb/conftest.py b/yeastregulatorydb/conftest.py index 301c73e..9a4b281 100644 --- a/yeastregulatorydb/conftest.py +++ b/yeastregulatorydb/conftest.py @@ -185,7 +185,9 @@ def fileformat(db) -> QuerySet: }, ",", "max_fc", + 0.0, "min_pval", + 1.0, ), "cc_promoter_sig": ( { @@ -204,13 +206,17 @@ def fileformat(db) -> QuerySet: }, ",", "callingcards_enrichment", + 0.0, "poisson_pval", + 1.0, ), "kemmeren": ( {"gene_id": "int", "M": "float", "Madj": "float", "A": "float", "pval": "float"}, ",", "Madj", + 0.0, "pval", + 1.0, ), "mcisaac": ( { @@ -224,18 +230,47 @@ def fileformat(db) -> QuerySet: }, ",", "log2_shrunken_timecourses", + 0.0, "none", + 1.0, ), "bed6": ( {"chr": "str", "start": "int", "end": "int", "name": "str", "score": "float", "strand": "str"}, "\t", "none", + 0.0, "none", + 1.0, + ), + "rank_response_summary": ( + { + "feature": "str", + "expression_effect": "int", + "expression_pvalue": "int", + "binding_effect": "str", + "binding_pvalue": "str", + "responsive": "int", + "ran_bin": "float", + "random": "float", + }, + ",", + "none", + 0.0, + "none", + 1.0, ), } for key, value in format_dict.items(): - fields, separator, effect, pval = value - FileFormatFactory.create(fileformat=key, fields=fields, separator=separator, effect_col=effect, pval_col=pval) + fields, separator, effect, effect_thres, pval, pval_thres = value + FileFormatFactory.create( + fileformat=key, + fields=fields, + separator=separator, + effect_col=effect, + default_effect_threshold=effect_thres, + pval_col=pval, + default_pvalue_threshold=pval_thres, + ) return FileFormat.objects.all() diff --git a/yeastregulatorydb/regulatory_data/api/filters/RankResponseFilter.py b/yeastregulatorydb/regulatory_data/api/filters/RankResponseFilter.py new file mode 100644 index 0000000..870db02 --- /dev/null +++ b/yeastregulatorydb/regulatory_data/api/filters/RankResponseFilter.py @@ -0,0 +1,37 @@ +import django_filters + +from ...models import RankResponse + + +class RankResponseFilter(django_filters.FilterSet): + id = django_filters.NumberFilter() + pk = django_filters.NumberFilter() + promotersetsig_id = django_filters.NumberFilter(field_name="promotersetsig__id") + binding_source = django_filters.CharFilter( + field_name="promotersetsig__binding__source__name", lookup_expr="iexact" + ) + expression_id = django_filters.NumberFilter(field_name="expression__id") + expression_source = django_filters.CharFilter(field_name="expression__source__name", lookup_expr="iexact") + regulator_locus_tag = django_filters.CharFilter( + field_name="expression__regulator__locus_tag", lookup_expr="iexact" + ) + regulator_symbol = django_filters.CharFilter(field_name="expression__regulator__symbol", lookup_expr="iexact") + expression_effect_threshold = django_filters.NumberFilter() + expression_pvalue_threshold = django_filters.NumberFilter() + normalized = django_filters.BooleanFilter() + + class Meta: + model = RankResponse + fields = [ + "id", + "pk", + "promotersetsig_id", + "binding_source", + "expression_id", + "expression_source", + "regulator_locus_tag", + "regulator_symbol", + "expression_effect_threshold", + "expression_pvalue_threshold", + "normalized", + ] diff --git a/yeastregulatorydb/regulatory_data/api/serializers/PromoterSetSigSerializer.py b/yeastregulatorydb/regulatory_data/api/serializers/PromoterSetSigSerializer.py index ccc5a98..fc698d8 100644 --- a/yeastregulatorydb/regulatory_data/api/serializers/PromoterSetSigSerializer.py +++ b/yeastregulatorydb/regulatory_data/api/serializers/PromoterSetSigSerializer.py @@ -1,10 +1,6 @@ -import pandas as pd -from django.conf import settings from rest_framework import serializers -from yeastregulatorydb.regulatory_data.utils.validate_genomic_df import validate_genomic_df - -from ...models import FileFormat, PromoterSetSig +from ...models import PromoterSetSig from .mixins.CustomValidateMixin import CustomValidateMixin from .mixins.FileValidationMixin import FileValidationMixin diff --git a/yeastregulatorydb/regulatory_data/api/serializers/RankResponseSerializer.py b/yeastregulatorydb/regulatory_data/api/serializers/RankResponseSerializer.py new file mode 100644 index 0000000..64038a4 --- /dev/null +++ b/yeastregulatorydb/regulatory_data/api/serializers/RankResponseSerializer.py @@ -0,0 +1,14 @@ +from rest_framework import serializers + +from ...models import RankResponse +from .mixins.CustomValidateMixin import CustomValidateMixin +from .mixins.FileValidationMixin import FileValidationMixin + + +class RankResponseSerializer(CustomValidateMixin, FileValidationMixin, serializers.ModelSerializer): + uploader = serializers.ReadOnlyField(source="uploader.username") + modifier = serializers.CharField(source="uploader.username", required=False) + + class Meta: + model = RankResponse + fields = "__all__" diff --git a/yeastregulatorydb/regulatory_data/api/serializers/__init__.py b/yeastregulatorydb/regulatory_data/api/serializers/__init__.py index 16f32d0..ab016c4 100644 --- a/yeastregulatorydb/regulatory_data/api/serializers/__init__.py +++ b/yeastregulatorydb/regulatory_data/api/serializers/__init__.py @@ -9,6 +9,7 @@ from .GenomicFeatureSerializer import GenomicFeatureSerializer from .PromoterSetSerializer import PromoterSetSerializer from .PromoterSetSigSerializer import PromoterSetSigSerializer +from .RankResponseSerializer import RankResponseSerializer from .RegulatorSerializer import RegulatorSerializer __all__ = [ @@ -23,5 +24,5 @@ "GenomicFeatureSerializer", "PromoterSetSerializer", "PromoterSetSigSerializer", - "RegulatorSerializer", + "RankResponseSerializer" "RegulatorSerializer", ] diff --git a/yeastregulatorydb/regulatory_data/api/views/BindingViewSet.py b/yeastregulatorydb/regulatory_data/api/views/BindingViewSet.py index 55db3c7..c5256df 100644 --- a/yeastregulatorydb/regulatory_data/api/views/BindingViewSet.py +++ b/yeastregulatorydb/regulatory_data/api/views/BindingViewSet.py @@ -1,14 +1,13 @@ +from celery import chain +from django.core.cache import cache from django_filters.rest_framework import DjangoFilterBackend from rest_framework import viewsets from rest_framework.authentication import SessionAuthentication, TokenAuthentication from rest_framework.permissions import IsAuthenticated -from yeastregulatorydb.regulatory_data.tasks.chipexo_pugh_allevents_promoter_sig import ( - chipexo_pugh_allevents_promoter_sig, -) - from ...models.Binding import Binding -from ..filters.BindingFilter import BindingFilter +from ...tasks import promoter_significance_task, rank_response_tasks +from ..filters import BindingFilter from ..serializers.BindingSerializer import BindingSerializer from .mixins.BulkUploadMixin import BulkUploadMixin from .mixins.UpdateModifiedMixin import UpdateModifiedMixin @@ -28,5 +27,24 @@ class BindingViewSet(UpdateModifiedMixin, viewsets.ModelViewSet, BulkUploadMixin def perform_create(self, serializer): instance = serializer.save() + task_type = None if instance.source.name == "chipexo_pugh_allevents": - chipexo_pugh_allevents_promoter_sig.delay(instance.id, self.request.user.id) + task_type = "chipexo_promoter_sig" + elif instance.source.name == "callingcards": + task_type = "callingcards_promoter_sig" + + if task_type: + lock_id = f"add_data_lock" + acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) + release_lock = lambda: cache.delete(lock_id) + + if acquire_lock(): + try: + # Create a chain of tasks + task = chain( + promoter_significance_task.s(instance.id, self.request.user.id, task_type), + rank_response_tasks.s(user_id=self.request.user.id), + ) + task.apply_async() + finally: + release_lock() diff --git a/yeastregulatorydb/regulatory_data/api/views/ExpressionViewSet.py b/yeastregulatorydb/regulatory_data/api/views/ExpressionViewSet.py index 9d9daa8..55c88bd 100644 --- a/yeastregulatorydb/regulatory_data/api/views/ExpressionViewSet.py +++ b/yeastregulatorydb/regulatory_data/api/views/ExpressionViewSet.py @@ -1,11 +1,14 @@ +from celery import chain +from django.core.cache import cache from django_filters.rest_framework import DjangoFilterBackend from rest_framework import viewsets from rest_framework.authentication import SessionAuthentication, TokenAuthentication from rest_framework.permissions import IsAuthenticated -from ...models.Expression import Expression -from ..filters.ExpressionFilter import ExpressionFilter -from ..serializers.ExpressionSerializer import ExpressionSerializer +from ...models import Expression, PromoterSetSig +from ...tasks import rank_response_task +from ..filters import ExpressionFilter +from ..serializers import ExpressionSerializer from .mixins.BulkUploadMixin import BulkUploadMixin from .mixins.UpdateModifiedMixin import UpdateModifiedMixin @@ -21,3 +24,22 @@ class ExpressionViewSet(UpdateModifiedMixin, viewsets.ModelViewSet, BulkUploadMi serializer_class = ExpressionSerializer filter_backends = [DjangoFilterBackend] filterset_class = ExpressionFilter + + def perform_create(self, serializer): + instance = serializer.save() + promotersetsig_id_list = PromoterSetSig.objects.filter( + binding__regulator__id=instance.regulator.id + ).values_list("id", flat=True) + + # Create a chain of tasks for each promotersetsig_id + lock_id = f"add_data_lock" + acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) + release_lock = lambda: cache.delete(lock_id) + + if acquire_lock(): + try: + # Create a chain of tasks + for promotersetsig_id in promotersetsig_id_list: + rank_response_task.apply_async(args=[promotersetsig_id, self.request.user.id]) + finally: + release_lock() diff --git a/yeastregulatorydb/regulatory_data/api/views/PromoterSetViewSet.py b/yeastregulatorydb/regulatory_data/api/views/PromoterSetViewSet.py index c7e0c9f..2c5a90a 100644 --- a/yeastregulatorydb/regulatory_data/api/views/PromoterSetViewSet.py +++ b/yeastregulatorydb/regulatory_data/api/views/PromoterSetViewSet.py @@ -1,9 +1,12 @@ +from celery import chain +from django.core.cache import cache from django_filters.rest_framework import DjangoFilterBackend from rest_framework import viewsets from rest_framework.authentication import SessionAuthentication, TokenAuthentication from rest_framework.permissions import IsAuthenticated -from ...models.PromoterSet import PromoterSet +from ...models import Binding, PromoterSet +from ...tasks import promoter_significance_task, rank_response_tasks from ..filters.PromoterSetFilter import PromoterSetFilter from ..serializers.PromoterSetSerializer import PromoterSetSerializer from .mixins.UpdateModifiedMixin import UpdateModifiedMixin @@ -20,3 +23,23 @@ class PromoterSetViewSet(UpdateModifiedMixin, viewsets.ModelViewSet): serializer_class = PromoterSetSerializer filter_backends = [DjangoFilterBackend] filterset_class = PromoterSetFilter + + +def perform_create(self, serializer): + instance = serializer.save() + + lock_id = f"add_data_lock" + acquire_lock = lambda: cache.add(lock_id, True, timeout=60 * 60) + release_lock = lambda: cache.delete(lock_id) + + if acquire_lock(): + try: + for binding_obj in Binding.objects.all(): + # Call promoter_significance_task and then rank_response_tasks + task = chain( + promoter_significance_task.s(binding_obj.id, self.request.user.id, instance.id), + rank_response_tasks.s(user_id=self.request.user.id), + ) + task.apply_async() + finally: + release_lock() diff --git a/yeastregulatorydb/regulatory_data/api/views/RankResponseViewSet.py b/yeastregulatorydb/regulatory_data/api/views/RankResponseViewSet.py new file mode 100644 index 0000000..985d889 --- /dev/null +++ b/yeastregulatorydb/regulatory_data/api/views/RankResponseViewSet.py @@ -0,0 +1,57 @@ +import tempfile + +import pandas as pd +from django.http import HttpResponse +from django_filters.rest_framework import DjangoFilterBackend +from rest_framework import status, viewsets +from rest_framework.authentication import SessionAuthentication, TokenAuthentication +from rest_framework.decorators import action +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from ...models import RankResponse +from ...utils.extract_file_from_storage import extract_file_from_storage +from ..filters.RankResponseFilter import RankResponseFilter +from ..serializers.RankResponseSerializer import RankResponseSerializer +from .mixins.UpdateModifiedMixin import UpdateModifiedMixin + + +class RankResponseViewSet(UpdateModifiedMixin, viewsets.ModelViewSet): + """ + A viewset for viewing and editing RankResponse instances. + """ + + queryset = RankResponse.objects.all() + authentication_classes = [SessionAuthentication, TokenAuthentication] + permission_classes = [IsAuthenticated] + serializer_class = RankResponseSerializer + filter_backends = [DjangoFilterBackend] + filterset_class = RankResponseFilter + + @action(detail=False, methods=["get"]) + def summary(self, request, *args, **kwargs): + if "rank_response_id" not in request.query_params: + return Response({"error": "rank_response_id must be specified"}, status=status.HTTP_400_BAD_REQUEST) + + filtered_queryset = self.filter_queryset(self.get_queryset()) + if len(filtered_queryset) != 1: + return Response( + { + "error": "rank_response_summary returned multiple matches to your query. There should only be 1. " + "Contact our admin -- this message shouldn't appear -- but in the meantime, " + "re-submit with only the `rank_response_id` as a paramater" + }, + status=status.HTTP_400_BAD_REQUEST, + ) + with tempfile.TemporaryDirectory() as tmpdir: + for rank_response_record in filtered_queryset: + # Iterate over the filtered queryset + filepath = extract_file_from_storage(rank_response_record.file, tmpdir) + df = pd.read_csv(filepath, compression="gzip") + + with tempfile.NamedTemporaryFile(suffix=".gz") as tmpfile: + df.to_csv(tmpfile.name, compression="gzip", index=False) + tmpfile.seek(0) + response = HttpResponse(tmpfile, content_type="application/gzip") + response["Content-Disposition"] = "attachment; filename=rank_response_summary.csvgz" + return response diff --git a/yeastregulatorydb/regulatory_data/models/FileFormat.py b/yeastregulatorydb/regulatory_data/models/FileFormat.py index 59575be..541bff3 100644 --- a/yeastregulatorydb/regulatory_data/models/FileFormat.py +++ b/yeastregulatorydb/regulatory_data/models/FileFormat.py @@ -43,18 +43,32 @@ class FileFormat(BaseModel): default="\t", help_text="The separator used in the file. Defaults to tab.", ) + feature_identifier_col = models.CharField( + max_length=40, + default="none", + help_text="The name of the column that should be used as the default " + "feature identifier column. Eg 'name'. Defaults to 'none'.", + ) effect_col = models.CharField( max_length=40, default="none", help_text="The name of the column that should be used as the default " "effect column. Eg 'score'. Defaults to 'none' if there is no effect column.", ) + default_effect_threshold = models.FloatField( + default=0.0, + help_text="The default threshold for the effect column. Defaults to 0.0.", + ) pval_col = models.CharField( max_length=40, default="none", help_text="The name of the column that should be used as the default " "p-value column. Eg 'pvalue'. Defaults to 'none' if there is no p-value column.", ) + default_pvalue_threshold = models.FloatField( + default=1.0, + help_text="The default threshold for the p-value column. Defaults to 1.0.", + ) def __str__(self): return f"{self.fileformat}" diff --git a/yeastregulatorydb/regulatory_data/models/PromoterSetSig.py b/yeastregulatorydb/regulatory_data/models/PromoterSetSig.py index 473e368..5be2eb5 100644 --- a/yeastregulatorydb/regulatory_data/models/PromoterSetSig.py +++ b/yeastregulatorydb/regulatory_data/models/PromoterSetSig.py @@ -44,7 +44,7 @@ def save(self, *args, **kwargs): # Store the old file path old_file_name = self.file.name if self.file else None super().save(*args, **kwargs) - self.update_file_name("file", "promotersetsig", "tsv.gz") + self.update_file_name("file", "promotersetsig", "csv.gz") new_file_name = self.file.name super().save(update_fields=["file"]) # If the file name changed, delete the old file diff --git a/yeastregulatorydb/regulatory_data/models/RankResponse.py b/yeastregulatorydb/regulatory_data/models/RankResponse.py new file mode 100644 index 0000000..78d486f --- /dev/null +++ b/yeastregulatorydb/regulatory_data/models/RankResponse.py @@ -0,0 +1,87 @@ +import logging + +from django.core.files.storage import default_storage +from django.db import models +from django.dispatch import receiver + +from .BaseModel import BaseModel +from .mixins.GzipFileUploadWithIdMixin import GzipFileUploadWithIdMixin + +logger = logging.getLogger(__name__) + + +class RankResponse(BaseModel, GzipFileUploadWithIdMixin): + """ + Store Rank Response data for a given binding and expression set for a given regulator + at specific expression effect and pvalue thresholds. The data may or may not be normalized + across expression data sets. + """ + + promotersetsig = models.ForeignKey( + "PromoterSetSig", on_delete=models.CASCADE, help_text="foreign key to the 'PromoterSetSig' table" + ) + expression = models.ForeignKey( + "Expression", on_delete=models.CASCADE, help_text="foreign key to the 'Expression' table" + ) + expression_effect_threshold = models.FloatField( + help_text="The threshold (absolute value) at which to label a gene as " + "'responsive' in the expression data. Works in conjunction with `expression_pvalue_threshold'. " + "Default is 0", + default=0, + ) + expression_pvalue_threshold = models.FloatField( + help_text="The threshold at which to label a gene as 'responsive' in " + "the expression data. Works in conjunction with `expression_effect_threshold`. " + "Default is 1", + default=1, + ) + fileformat = models.ForeignKey( + "FileFormat", on_delete=models.CASCADE, help_text="foreign key to the 'FileFormat' table" + ) + normalized = models.BooleanField( + help_text="This indicates whether the data has been normalized to have the same number of responsive genes " + "across expression data sets. Default is False. WARNING: not yet implemented -- all are `False`", + default=False, + ) + file = models.FileField( + upload_to="temp", + help_text="A file which stores the rank response data for a given " + "binding and expression set for a given regulator at specific " + "expression effect and pvalue thresholds", + ) + + def __str__(self): + return f"pk:{self.pk};promotersetsig:{self.binding};expression:{self.expression}" + + class Meta: + db_table = "rankresponse" + + # pylint:disable=R0801 + def save(self, *args, **kwargs): + # Store the old file path + old_file_name = self.file.name if self.file else None + super().save(*args, **kwargs) + self.update_file_name("file", "rankresponse", "csv.gz") + new_file_name = self.file.name + super().save(update_fields=["file"]) + # If the file name changed, delete the old file + if old_file_name and old_file_name != new_file_name: + default_storage.delete(old_file_name) + + # pylint:enable=R0801 + + +@receiver(models.signals.post_delete, sender=RankResponse) +def remove_file_from_s3(sender, instance, using, **kwargs): # pylint: disable=unused-argument + """ + this is a post_delete signal. Hence, if the delete command is successful, + the file will be deleted. If the delete command is successful, and for some + reason the delete signal fails, it is possible to end up with files in S3 + which are not referenced by the database. + upon inception, there did not exist any images which were not referenced. + So,if unreferenced files are ever found, that should indicate that these + files are erroneous and can be safely deleted + """ + # note that if the directory (and all subdirectories) are empty, the + # directory will also be removed + instance.file.delete(save=False) diff --git a/yeastregulatorydb/regulatory_data/models/__init__.py b/yeastregulatorydb/regulatory_data/models/__init__.py index 12c08eb..6ce54f1 100644 --- a/yeastregulatorydb/regulatory_data/models/__init__.py +++ b/yeastregulatorydb/regulatory_data/models/__init__.py @@ -9,6 +9,7 @@ from .GenomicFeature import GenomicFeature from .PromoterSet import PromoterSet from .PromoterSetSig import PromoterSetSig +from .RankResponse import RankResponse from .Regulator import Regulator __all__ = [ @@ -21,6 +22,7 @@ "ExpressionManualQC", "FileFormat", "GenomicFeature", + "RankResponse", "PromoterSet", "PromoterSetSig", "Regulator", diff --git a/yeastregulatorydb/regulatory_data/tasks/__init__.py b/yeastregulatorydb/regulatory_data/tasks/__init__.py index e69de29..c9d0034 100644 --- a/yeastregulatorydb/regulatory_data/tasks/__init__.py +++ b/yeastregulatorydb/regulatory_data/tasks/__init__.py @@ -0,0 +1,2 @@ +from .promoter_significance_task import promoter_significance_task +from .rank_response_task import rank_response_task, rank_response_tasks diff --git a/yeastregulatorydb/regulatory_data/tasks/chipexo_pugh_allevents_promoter_sig.py b/yeastregulatorydb/regulatory_data/tasks/chipexo_pugh_allevents_promoter_sig.py deleted file mode 100644 index 52ec88b..0000000 --- a/yeastregulatorydb/regulatory_data/tasks/chipexo_pugh_allevents_promoter_sig.py +++ /dev/null @@ -1,105 +0,0 @@ -import gzip -import io -import logging -import os -import tempfile -from types import SimpleNamespace - -import pandas as pd -from callingcardstools.Analysis.yeast.chipexo_promoter_sig import chipexo_promoter_sig -from django.conf import settings -from django.contrib.auth import get_user_model -from django.core.files import File - -from config import celery_app -from yeastregulatorydb.regulatory_data.api.serializers import PromoterSetSigSerializer -from yeastregulatorydb.regulatory_data.models import Binding, ChrMap, FileFormat, PromoterSet -from yeastregulatorydb.regulatory_data.utils.extract_file_from_storage import extract_file_from_storage - -logger = logging.getLogger(__name__) - - -@celery_app.task() -def chipexo_pugh_allevents_promoter_sig(chipexo_id: int, user_id: int) -> list: - """For each promoter set in PromoterSet, create the chipexo promoter significance file. - Return a list of PromoterSetSig objects that may be passed on to the rank response - endpoint. - - :param chipexo_record: The Binding record for the chipexo_pugh_allevents data - :type chipexo_record: Binding - :param user_id: The id of the user who initiated the task - :type user_id: int - - :return: A list of PromoterSetSig object ids - :rtype: list - - :raises ValueError: If the Binding record with id `chipexo_id` does not - exist or if the chipexo_promoter_sig FileFormat does not exist - :raises ValidationError: If the serializer is invalid - """ - try: - User = get_user_model() - user = User.objects.get(id=user_id) - except User.DoesNotExist: - raise ValueError(f"User with id {user_id} does not exist") - try: - chipexo_record = Binding.objects.get(id=chipexo_id) - except Binding.DoesNotExist: - raise ValueError(f"Binding record with id {chipexo_id} does not exist") - - try: - chipexo_promoter_sig_fileformat = FileFormat.objects.get(fileformat="chipexo_promoter_sig") - except FileFormat.DoesNotExist: - raise ValueError("FileFormat chipexo_promoter_sig does not exist") - - with tempfile.TemporaryDirectory() as tmpdir: - chipexo_filepath = extract_file_from_storage(chipexo_record.file, tmpdir) - - chrmap_filepath = os.path.join(tmpdir, "chrmap.csv") - - pd.DataFrame(list(ChrMap.objects.all().values())).to_csv(chrmap_filepath, index=False) - - promoter_set_sig_list = [] - for promoter_record in PromoterSet.objects.iterator(): - promoter_filepath = extract_file_from_storage(promoter_record.file, tmpdir) - - result = chipexo_promoter_sig( - chipexo_filepath, - settings.CHR_FORMAT, - promoter_filepath, - settings.CHR_FORMAT, - chrmap_filepath, - settings.CHR_FORMAT, - ) - - buffer = io.BytesIO() - with gzip.GzipFile(fileobj=buffer, mode="wb") as gzipped_file: - result.to_csv(gzipped_file, index=False) - - # Reset buffer position - buffer.seek(0) - - # Create a Django File object with a filename - django_file = File(buffer, name="my_file.csv.gz") - - # Create a mock request with only a user attribute - # Assuming you have the user_id available - mock_request = SimpleNamespace(user=user) - - serializer = PromoterSetSigSerializer( - data={ - "binding": chipexo_record.id, - "promoter": promoter_record.id, - "fileformat": chipexo_promoter_sig_fileformat.id, - "file": django_file, - }, - context={"request": mock_request}, - ) - - if serializer.is_valid(): - promoter_set_sig = serializer.save() - promoter_set_sig_list.append(promoter_set_sig.id) - else: - logger.error(f"ChipExo promoterSetSig Serializer is invalid: {serializer.errors}") - - return promoter_set_sig_list diff --git a/yeastregulatorydb/regulatory_data/tasks/promoter_significance_task.py b/yeastregulatorydb/regulatory_data/tasks/promoter_significance_task.py new file mode 100644 index 0000000..9cb726e --- /dev/null +++ b/yeastregulatorydb/regulatory_data/tasks/promoter_significance_task.py @@ -0,0 +1,166 @@ +import gzip +import io +import logging +import os +import tempfile +import uuid +from collections import namedtuple +from types import SimpleNamespace + +import pandas as pd +from callingcardstools.Analysis.yeast.chipexo_promoter_sig import chipexo_promoter_sig +from callingcardstools.PeakCalling.yeast.call_peaks import call_peaks as callingcards_promoter_sig +from django.conf import settings +from django.contrib.auth import get_user_model +from django.core.files import File + +from config import celery_app +from yeastregulatorydb.regulatory_data.api.serializers import PromoterSetSigSerializer +from yeastregulatorydb.regulatory_data.models import Binding, CallingCardsBackground, ChrMap, FileFormat, PromoterSet +from yeastregulatorydb.regulatory_data.utils.extract_file_from_storage import extract_file_from_storage + +logger = logging.getLogger(__name__) + + +@celery_app.task() +def promoter_significance_task(binding_id: int, user_id: int, output_fileformat: str, **kwargs) -> list: + """For each promoter set in PromoterSet, create the chipexo promoter significance file. + Return a list of PromoterSetSig objects that may be passed on to the rank response + endpoint. + + :param binding_id: The Binding record for the chipexo_pugh_allevents data + :type binding_id: Binding + :param user_id: The id of the user who initiated the task + :type user_id: int + :param output_fileformat: The name of the output FileFormat + :type output_fileformat: str + :param kwargs: Additional keyword arguments. If `promoterset_id` is passed, + then the significance will be calculated only that specific promoterset. + Else, it is calculated over all promoter sets in the PromoterSet table. + If the output_fileformat is callingcards_promoter_sig and `background_id` + is passed in kwargs, then the promoter significance will be calculated + that specific background set only. Else, significance will be calculated + for all background sets + + :return: A list of PromoterSetSig object ids + :rtype: list + + :raises ValueError: If the Binding record with id `binding_id` does not + exist or if the chipexo_promoter_sig FileFormat does not exist + :raises ValidationError: If the serializer is invalid + """ + try: + User = get_user_model() + user = User.objects.get(id=user_id) + except User.DoesNotExist: + raise ValueError(f"User with id {user_id} does not exist") + + try: + binding_record = Binding.objects.get(id=binding_id) + except Binding.DoesNotExist: + raise ValueError(f"Binding record with id {binding_id} does not exist") + + try: + fileformat_record = FileFormat.objects.get(fileformat=output_fileformat) + except FileFormat.DoesNotExist: + raise ValueError(f"FileFormat '{output_fileformat}' does not exist") + + with tempfile.TemporaryDirectory() as tmpdir: + chrmap_filepath = os.path.join(tmpdir, "chrmap.csv") + + pd.DataFrame(list(ChrMap.objects.all().values())).to_csv(chrmap_filepath, index=False) + + binding_filepath = extract_file_from_storage(binding_record.file, tmpdir) + + # result_list stores ResultObject tuples where `df` is the dataframe + # output by the promoter_significance function and `background_id` is + # if promoterset_id is passed, then extract only that record. Else, + # generate an iterator that will return all records in the PromoterSet + # table + promoterset_objects_iterator = ( + PromoterSet.objects.filter(id=kwargs.get("promoterset_id")).iterator() + if "promoterset_id" in kwargs + else PromoterSet.objects.iterator() + ) + # None if there is no background, or the record `id` if there is + ResultObject = namedtuple("ResultObject", ["df", "background_id"]) + result_list = [] + for promoter_record in promoterset_objects_iterator: + promoter_filepath = extract_file_from_storage(promoter_record.file, tmpdir) + + if output_fileformat == settings.CHIPEXO_PROMOTER_SIG_FORMAT: + result = chipexo_promoter_sig( + binding_filepath, + settings.CHR_FORMAT, + promoter_filepath, + settings.CHR_FORMAT, + chrmap_filepath, + settings.CHR_FORMAT, + ) + result_list.append(ResultObject(result, None)) + elif output_fileformat == settings.CALLINGCARDS_PROMOTER_SIG_FORMAT: + # if background_id is passed, then extract only that record. + # else, generate an iterator that will return all records in + # the CallingCardsBackground table + background_objects_iterator = ( + CallingCardsBackground.objects.filter(id=kwargs.get("background_id")).iterator() + if "background_id" in kwargs + else CallingCardsBackground.objects.iterator() + ) + for background_record in background_objects_iterator: + background_filepath = extract_file_from_storage(background_record.file, tmpdir) + + result = callingcards_promoter_sig( + binding_filepath, + settings.CHR_FORMAT, + promoter_filepath, + settings.CHR_FORMAT, + background_filepath, + settings.CHR_FORMAT, + chrmap_filepath, + False, + settings.CHR_FORMAT, + ) + result_list.append(ResultObject(result, binding_record.id)) + else: + raise ValueError(f"FileFormat '{output_fileformat}' not supported") + + # output_list stores promoter_set_sig `id`s for successfully uploaded + # records + output_list = [] + for res_obj in result_list: + buffer = io.BytesIO() + with gzip.GzipFile(fileobj=buffer, mode="wb") as gzipped_file: + res_obj.df.to_csv(gzipped_file, index=False) + + # Reset buffer position + buffer.seek(0) + + # Create a Django File object with a uuid filename + django_file = File(buffer, name=f"{uuid.uuid4()}.csv.gz") + + # Create a mock request with only a user attribute + # Assuming you have the user_id available + mock_request = SimpleNamespace(user=user) + + upload_data = { + "binding": binding_record.id, + "promoter": promoter_record.id, + "fileformat": fileformat_record.id, + "file": django_file, + } + if res_obj.background_id: + upload_data["background"] = res_obj.background_id + + serializer = PromoterSetSigSerializer( + data=upload_data, + context={"request": mock_request}, + ) + + if serializer.is_valid(): + promoter_set_sig = serializer.save() + output_list.append(promoter_set_sig.id) + else: + logger.error(f"promoterSetSig Serializer is invalid: {serializer.errors}") + + return output_list diff --git a/yeastregulatorydb/regulatory_data/tasks/rank_response_task.py b/yeastregulatorydb/regulatory_data/tasks/rank_response_task.py new file mode 100644 index 0000000..cc60d7a --- /dev/null +++ b/yeastregulatorydb/regulatory_data/tasks/rank_response_task.py @@ -0,0 +1,180 @@ +import gzip +import io +import logging +import tempfile +import uuid + +from callingcardstools.Analysis.yeast import rank_response +from django.contrib.auth import get_user_model +from django.core.files import File + +from config import celery_app +from yeastregulatorydb.regulatory_data.models import Expression, FileFormat, PromoterSetSig, RankResponse +from yeastregulatorydb.regulatory_data.utils.extract_file_from_storage import extract_file_from_storage + +logger = logging.getLogger(__name__) + + +@celery_app.task() +def rank_response_tasks(promotersetsig_ids: list, user_id: int, **kwargs) -> None: + for promotersetsig_id in promotersetsig_ids: + rank_response_task.delay(promotersetsig_id, user_id, **kwargs) + + +@celery_app.task() +def rank_response_task( + promotersetsig_id: int, + user_id: int, + **kwargs, +) -> list: + try: + User = get_user_model() + user = User.objects.get(id=user_id) + except User.DoesNotExist: + raise ValueError(f"User with id {user_id} does not exist") + + try: + promotersetsig_record = PromoterSetSig.objects.get(id=promotersetsig_id) + except PromoterSetSig.DoesNotExist: + raise ValueError(f"Binding record with id {promotersetsig_id} does not exist") + + try: + rankresponse_summary_fileformat_record = FileFormat.objects.get(fileformat="rank_repsonse_summary") + except FileFormat.DoesNotExist: + raise ValueError(f"FileFormat 'rank_response_summary' does not exist") + + try: + binding_expression_annotated_fileformat_record = FileFormat.objects.get( + fileformat="binding_expression_annotated" + ) + except FileFormat.DoesNotExist: + raise ValueError(f"FileFormat 'binding_expression_annotated' does not exist") + + with tempfile.TemporaryDirectory() as tmpdir: + promotersetsig_filepath = extract_file_from_storage(promotersetsig_record.file, tmpdir) + + expression_objects_iterator = ( + Expression.objects.filter(id=kwargs.get("expression_id")).iterator() + if "expression_id" in kwargs + else Expression.objects.iterator() + ) + + results_list = [] + for record in expression_objects_iterator: + expression_filepath = extract_file_from_storage(record.file, tmpdir) + + config_dict = { + "binding_data_path": promotersetsig_filepath, + "binding_identifier_col": promotersetsig_record.binding.source.fileformat.feature_identifier_col, + "binding_effect_col": promotersetsig_record.binding.source.fileformat, + "binding_pvalue_col": promotersetsig_record.binding.source.fileformat, + "binding_source": promotersetsig_record.binding.source.name, + "expression_data_path": expression_filepath, + "expression_identifier_col": record.source.fileformat.feature_identifier_col, + "expression_effect_col": record.source.fileformat.effect_col, + "expression_pvalue_col": record.source.fileformat.pval_col, + "expression_source": record.source.name, + "expression_effect_thres": kwargs.get( + "expression_effect_threshold", record.source.fileformat.default_effect_threshold + ), + "expression_effect_thres": kwargs.get( + "expression_pvalue_threshold", record.source.fileformat.default_pvalue_threshold + ), + "normalize": kwargs.get("normalize", False), + "rank_bin_size": kwargs.get("rank_bin_size", 5), + } + + # validate the configuration key/value pairs + args = rank_response.validate_config(config_dict) + # read i the binding data + try: + binding_data = rank_response.read_in_data( + args["binding_data_path"], + args["binding_identifier_col"], + args["binding_effect_col"], + args["binding_pvalue_col"], + args["binding_source"], + "binding", + ) + except (KeyError, FileExistsError, AttributeError) as exc: + logger.error("Error reading in binding data: %s", exc) + raise + + # read in the expression data + try: + expression_data = rank_response.read_in_data( + args["expression_data_path"], + args["expression_identifier_col"], + args["expression_effect_col"], + args["expression_pvalue_col"], + args["expression_source"], + "expression", + ) + except (KeyError, FileExistsError, AttributeError) as exc: + logger.error("Error reading in expression data: %s", exc) + raise + + df = expression_data.merge( + binding_data[["binding_effect", "binding_pvalue", "binding_source", "feature"]], + how="inner", + on="feature", + ) + # test that there no incomplete cases. raise an error if there are + if df.isnull().values.any(): + error_message = "There are incomplete cases in the data" + logger.error(error_message) + raise ValueError(error_message) + + try: + binding_expr_annotated_df, random_df, rank_response_df = rank_response.rank_response_ratio_summarize( + df, + effect_expression_thres=args["expression_effect_thres"], + p_expression_thres=args["expression_pvalue_thres"], + normalize=args["normalize"], + bin_size=args["rank_bin_size"], + ) + except KeyError as exc: + logger.error("Error summarizing data: %s", exc) + raise + + results_list.append((record, binding_expr_annotated_df)) + + output_list = [] + for result_tuple in results_list: + # extract the dataframes from the output tuple + expression_record, binding_expr_annotated_df = result_tuple + # create a buffer to store the dataframe + binding_expr_annotated_buffer = io.BytesIO() + with gzip.GzipFile(fileobj=binding_expr_annotated_buffer, mode="wb") as gzipped_file: + binding_expr_annotated_df.df.to_csv(gzipped_file, index=False) + # Reset buffer position + binding_expr_annotated_buffer.seek(0) + # Create a Django File object with a uuid filename + binding_expr_annotated_file = File(binding_expr_annotated_buffer, name=f"{uuid.uuid4()}.csv.gz") + + rank_response_buffer = io.BytesIO() + with gzip.GzipFile(fileobj=rank_response_buffer, mode="wb") as gzipped_file: + rank_response_df.df.to_csv(gzipped_file, index=False) + # Reset buffer position + rank_response_buffer.seek(0) + # Create a Django File object with a uuid filename + rank_response_file = File(rank_response_buffer, name=f"{uuid.uuid4()}.csv.gz") + + rankresponse_record = RankResponse.objects.create( + user=user, + promotersetsig=promotersetsig_record, + expression=expression_record, + fileformat=rank_response_file, + file=binding_expr_annotated_file, + ) + + # serialize the PromoterSetSig object + serializer = RankResponse(rankresponse_record) + # validate the serializer + serializer.is_valid(raise_exception=True) + # save the serializer + serializer.save() + # add the id to the output list + output_list.append(rankresponse_record.id) + + return output_list diff --git a/yeastregulatorydb/regulatory_data/tests/test_data/binding/chipexo/28366_yiming_promoter_sig.csv.gz b/yeastregulatorydb/regulatory_data/tests/test_data/binding/chipexo/28366_yiming_promoter_sig.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..cfafe7b8160926f0555276b1c50890212e35b6aa GIT binary patch literal 221 zcmV<303!b%iwFpf8-8T~12Q->Ha1^*X>Dn4XJ2q~Z*6aMWpZD0X=g5Db9Mk-%LKip~SNA=?ly-%REKe?2X{`CxY%VVdhWLpSVUsC{SOu`cg+ zIv?9xS1%Lv_0b-7(4U9)cCS|#!0xtyDFmT~Y-ofH5RfmWKtm#)b9DGlF~w*Q8cNUr zS{lVt%w8fo`j%N12|I4_Rj3~OGhURsXqgctFR9yO$%*7xwRM*YllGjpLB^$y< XNw3Y|RS}xxLn-bF0W+}XGXVeqyL@DS literal 0 HcmV?d00001 diff --git a/yeastregulatorydb/regulatory_data/tests/test_tasks.py b/yeastregulatorydb/regulatory_data/tests/test_tasks.py index ba9f33b..25c91e4 100644 --- a/yeastregulatorydb/regulatory_data/tests/test_tasks.py +++ b/yeastregulatorydb/regulatory_data/tests/test_tasks.py @@ -2,15 +2,14 @@ import pytest from celery.result import EagerResult +from django.conf import settings from django.core.files.uploadedfile import SimpleUploadedFile from django.db.models.query import QuerySet from rest_framework.test import APIRequestFactory from yeastregulatorydb.regulatory_data.api.serializers import BindingSerializer, PromoterSetSerializer from yeastregulatorydb.regulatory_data.models import DataSource, Regulator -from yeastregulatorydb.regulatory_data.tasks.chipexo_pugh_allevents_promoter_sig import ( - chipexo_pugh_allevents_promoter_sig, -) +from yeastregulatorydb.regulatory_data.tasks.promoter_significance_task import promoter_significance_task from yeastregulatorydb.regulatory_data.tests.factories import BindingFactory, PromoterSetFactory from yeastregulatorydb.regulatory_data.tests.utils.model_to_dict_select import model_to_dict_select from yeastregulatorydb.users.models import User @@ -19,10 +18,10 @@ @pytest.mark.djanbo_db -def test_chipexo_pugh_allevents_promoter_sig( +def test_promoter_significance_task( settings, chrmap: QuerySet, fileformat: QuerySet, chipexo_datasource: DataSource, regulator: Regulator, user: User ): - """test chipexo_pugh_allevents_promoter_sig task""" + """test promoter_significance_task task""" # Create a request object and set the user factory = APIRequestFactory() request = factory.get("/") @@ -58,6 +57,55 @@ def test_chipexo_pugh_allevents_promoter_sig( assert serializer.is_valid() == True, serializer.errors instance = serializer.save() settings.CELERY_TASK_ALWAYS_EAGER = True - task_result = chipexo_pugh_allevents_promoter_sig.delay(instance.id, request.user.id) + task_result = promoter_significance_task.delay( + instance.id, request.user.id, settings.CHIPEXO_PROMOTER_SIG_FORMAT + ) assert isinstance(task_result, EagerResult) assert isinstance(task_result.result, list) + + +# @pytest.mark.djanbo_db +# def test_rank_response_task( +# settings, chrmap: QuerySet, fileformat: QuerySet, chipexo_datasource: DataSource, regulator: Regulator, user: User +# ): +# """test promoter_significance_task task""" +# # Create a request object and set the user +# factory = APIRequestFactory() +# request = factory.get("/") +# request.user = user + +# # create the promoter set record +# promoterset_path = os.path.join(os.path.dirname(__file__), "test_data", "yiming_promoters_chrI.bed.gz") +# assert os.path.exists(promoterset_path), f"path: {promoterset_path}" + +# # Open the file and read its content +# with open(promoterset_path, "rb") as file_obj: +# file_content = file_obj.read() +# # Create a SimpleUploadedFile instance +# upload_file = SimpleUploadedFile("yiming_promoters_chrI.bed.gz", file_content, content_type="application/gzip") +# data = model_to_dict_select(PromoterSetFactory.build(name="yiming", file=upload_file)) +# serializer = PromoterSetSerializer(data=data, context={"request": request}) +# assert serializer.is_valid() == True, serializer.errors +# serializer.save() + +# # create the chipexo Binding record +# file_path = os.path.join(os.path.dirname(__file__), "test_data", "binding/chipexo/28366_chrI.csv.gz") +# assert os.path.exists(file_path), f"path: {file_path}" + +# # Open the file and read its content +# with open(file_path, "rb") as file_obj: +# file_content = file_obj.read() +# # Create a SimpleUploadedFile instance +# upload_file = SimpleUploadedFile("28366_chrI.csv.gz", file_content, content_type="application/gzip") +# data = model_to_dict_select( +# BindingFactory.build(source=chipexo_datasource, regulator=regulator, file=upload_file) +# ) +# serializer = BindingSerializer(data=data, context={"request": request}) +# assert serializer.is_valid() == True, serializer.errors +# instance = serializer.save() +# settings.CELERY_TASK_ALWAYS_EAGER = True +# task_result = promoter_significance_task.delay( +# instance.id, request.user.id, settings.CHIPEXO_PROMOTER_SIG_FORMAT +# ) +# assert isinstance(task_result, EagerResult) +# assert isinstance(task_result.result, list)