Skip to content

Commit

Permalink
allow raw data to expire
Browse files Browse the repository at this point in the history
- add `RawDatum.expiration_date` model field (with index)
- add "expiration_date" query param to `/trove/ingest`
- update digestive tract to appropriately handle expired data
- add periodic task to expel expired data daily
  • Loading branch information
aaxelb committed Oct 14, 2024
1 parent d696fdf commit 9679110
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 15 deletions.
1 change: 1 addition & 0 deletions how-to/use-the-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ query params:
- `record_identifier` (required): a source-specific identifier for the metadata record (no format restrictions) -- sending another record with the same `record_identifier` is considered a full update (only the most recent is used)
- `nonurgent`: if present (regardless of value), ingestion may be given a lower priority -- recommended for bulk or background operations
- `is_supplementary`: if present (regardless of value), this record's metadata will be added to all pre-existing index-cards from the same user with the same `focus_iri` (if any), but will not get an index-card of its own nor affect the last-updated timestamp (e.g. in OAI-PMH) of the index-cards it supplements
- `expiration_date`: optional date (in format `YYYY-MM-DD`) when the record is no longer valid and should be removed

## Deleting index-cards

Expand Down
4 changes: 4 additions & 0 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ def split(string, delim):
'task': 'share.tasks.harvest',
'schedule': 120,
},
'Expel expired data': {
'task': 'trove.digestive_tract.task__expel_expired_data',
'schedule': crontab(hour=0, minute=0), # every day at midnight UTC
},
}

if not DEBUG:
Expand Down
18 changes: 18 additions & 0 deletions share/migrations/0075_rawdatum_expiration_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.25 on 2024-10-14 15:52

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('share', '0074_sourceuniqueidentifier_is_supplementary'),
]

operations = [
migrations.AddField(
model_name='rawdatum',
name='expiration_date',
field=models.DateField(help_text='An (optional) date after which this datum is no longer valid.', null=True),
),
]
17 changes: 17 additions & 0 deletions share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.db import migrations, models
from django.contrib.postgres.operations import AddIndexConcurrently


class Migration(migrations.Migration):
atomic = False # allow adding indexes concurrently (without locking tables)

dependencies = [
('share', '0075_rawdatum_expiration_date'),
]

operations = [
AddIndexConcurrently(
model_name='rawdatum',
index=models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'),
),
]
26 changes: 24 additions & 2 deletions share/models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,23 @@ def store_data(self, config, fetch_result):

return rd

def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: datetime.datetime):
def store_datum_for_suid(
self,
*,
suid,
datum: str,
mediatype: str | None, # `None` indicates sharev2-legacy ingestion
datestamp: datetime.datetime,
expiration_date: datetime.date | None = None,
):
_raw, _raw_created = self.get_or_create(
suid=suid,
sha256=hashlib.sha256(datum.encode()).hexdigest(),
defaults={
'datum': datum,
'mediatype': mediatype,
'datestamp': datestamp,
'expiration_date': expiration_date,
},
)
if not _raw_created:
Expand All @@ -371,10 +380,11 @@ def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: dateti
logger.critical(_msg)
sentry_sdk.capture_message(_msg)
_raw.mediatype = mediatype
_raw.expiration_date = expiration_date
# keep the latest datestamp
if (not _raw.datestamp) or (datestamp > _raw.datestamp):
_raw.datestamp = datestamp
_raw.save(update_fields=('mediatype', 'datestamp'))
_raw.save(update_fields=('mediatype', 'datestamp', 'expiration_date'))
return _raw

def latest_by_suid_id(self, suid_id) -> models.QuerySet:
Expand Down Expand Up @@ -420,6 +430,10 @@ class RawDatum(models.Model):
'This may be, but is not limited to, a deletion, modification, publication, or creation datestamp. '
'Ideally, this datetime should be appropriate for determining the chronological order its data will be applied.'
))
expiration_date = models.DateField(
null=True,
help_text='An (optional) date after which this datum is no longer valid.',
)

date_modified = models.DateTimeField(auto_now=True, editable=False)
date_created = models.DateTimeField(auto_now_add=True, editable=False)
Expand Down Expand Up @@ -447,11 +461,19 @@ def is_latest(self):
.exists()
)

@property
def is_expired(self) -> bool:
return (
self.expiration_date is not None
and self.expiration_date >= datetime.date.today()
)

class Meta:
unique_together = ('suid', 'sha256')
verbose_name_plural = 'Raw Data'
indexes = [
models.Index(fields=['no_output'], name='share_rawda_no_outp_f0330f_idx'),
models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'),
]

class JSONAPIMeta(BaseJSONAPIMeta):
Expand Down
207 changes: 207 additions & 0 deletions tests/trove/digestive_tract/test_expel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import datetime
from unittest import mock

from django.test import TestCase
from primitive_metadata import primitive_rdf as rdf

from share import models as share_db
from tests import factories
from trove import digestive_tract
from trove import models as trove_db


_BLARG = rdf.IriNamespace('https://blarg.example/')


class TestDigestiveTractExpel(TestCase):
@classmethod
def setUpTestData(cls):
cls.focus_1 = _BLARG.this1
cls.focus_2 = _BLARG.this2
cls.raw_1, cls.indexcard_1 = _setup_ingested(cls.focus_1)
cls.raw_2, cls.indexcard_2 = _setup_ingested(cls.focus_2)
cls.raw_supp = _setup_supplementary(cls.focus_1, cls.raw_1.suid, cls.indexcard_1)

def setUp(self):
super().setUp()
self.notified_indexcard_ids = set()
self.enterContext(mock.patch(
'share.search.index_messenger.IndexMessenger.notify_indexcard_update',
new=self._replacement_notify_indexcard_update,
))
self.mock_derive_task = self.enterContext(mock.patch('trove.digestive_tract.task__derive'))

def _replacement_notify_indexcard_update(self, indexcards, **kwargs):
self.notified_indexcard_ids.update(_card.id for _card in indexcards)

def enterContext(self, context_manager):
# TestCase.enterContext added in python3.11 -- implementing here until then
result = context_manager.__enter__()
self.addCleanup(lambda: context_manager.__exit__(None, None, None))
return result

def test_setup(self):
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# neither notified indexes nor enqueued re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_not_called()

def test_expel(self):
with mock.patch('trove.digestive_tract.expel_suid') as _mock_expel_suid:
_user = self.raw_1.suid.source_config.source.user
digestive_tract.expel(from_user=_user, record_identifier=self.raw_1.suid.identifier)
_mock_expel_suid.assert_called_once_with(self.raw_1.suid)

def test_expel_suid(self):
digestive_tract.expel_suid(self.raw_1.suid)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNotNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist):
self.indexcard_1.latest_rdf # deleted
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# notified indexes of update; did not enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, {self.indexcard_1.id})
self.mock_derive_task.delay.assert_not_called()

def test_expel_supplementary_suid(self):
digestive_tract.expel_suid(self.raw_supp.suid)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# did not notify indexes of update; did enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id)

def test_expel_expired_task(self):
with mock.patch('trove.digestive_tract.expel_expired_data') as _mock_expel_expired:
digestive_tract.task__expel_expired_data.apply()
_mock_expel_expired.assert_called_once_with(datetime.date.today())

def test_expel_expired(self):
_today = datetime.date.today()
self.raw_2.expiration_date = _today
self.raw_2.save()
digestive_tract.expel_expired_data(_today)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNotNone(self.indexcard_2.deleted) # marked deleted
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist):
self.indexcard_2.latest_rdf # deleted
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 0) # deleted
# notified indexes of update; did not enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, {self.indexcard_2.id})
self.mock_derive_task.delay.assert_not_called()

def test_expel_expired_supplement(self):
_today = datetime.date.today()
self.raw_supp.expiration_date = _today
self.raw_supp.save()
digestive_tract.expel_expired_data(_today)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# did not notify indexes of update; did enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id)


def _setup_ingested(focus_iri: str):
_focus_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(focus_iri)
_suid = factories.SourceUniqueIdentifierFactory(
focus_identifier=_focus_ident,
)
_raw = factories.RawDatumFactory(suid=_suid)
_indexcard = trove_db.Indexcard.objects.create(source_record_suid=_raw.suid)
_indexcard.focus_identifier_set.add(_focus_ident)
_latest_rdf = trove_db.LatestIndexcardRdf.objects.create(
indexcard=_indexcard,
from_raw_datum=_raw,
focus_iri=focus_iri,
rdf_as_turtle='...',
)
trove_db.ArchivedIndexcardRdf.objects.create(
indexcard=_indexcard,
from_raw_datum=_raw,
focus_iri=focus_iri,
rdf_as_turtle=_latest_rdf.rdf_as_turtle,
)
_deriver_iri = _BLARG.deriver
_deriver_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(_deriver_iri)
trove_db.DerivedIndexcard.objects.create(
upriver_indexcard=_indexcard,
deriver_identifier=_deriver_ident,
derived_checksum_iri='...',
derived_text='...',
)
return _raw, _indexcard


def _setup_supplementary(focus_iri, main_suid, indexcard):
_supp_suid = factories.SourceUniqueIdentifierFactory(
focus_identifier=main_suid.focus_identifier,
source_config=main_suid.source_config,
is_supplementary=True,
)
_supp_raw = factories.RawDatumFactory(suid=_supp_suid)
trove_db.SupplementaryIndexcardRdf.objects.create(
indexcard=indexcard,
from_raw_datum=_supp_raw,
supplementary_suid=_supp_suid,
focus_iri=focus_iri,
rdf_as_turtle='...',
)
return _supp_raw
12 changes: 12 additions & 0 deletions tests/trove/digestive_tract/test_extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime
from django.test import TestCase
from primitive_metadata import primitive_rdf as rdf

from tests import factories
from trove import digestive_tract
from trove import exceptions as trove_exceptions
from trove import models as trove_db


Expand Down Expand Up @@ -128,3 +130,13 @@ def test_extract_empty_supplementary(self):
(_indexcard,) = digestive_tract.extract(_empty_raw)
self.assertEqual(_indexcard.id, _orig_indexcard.id)
self.assertFalse(_orig_indexcard.supplementary_rdf_set.exists())

def test_extract_expired(self):
self.raw.expiration_date = datetime.date.today()
with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum):
digestive_tract.extract(self.raw)

def test_extract_expired_supplement(self):
self.supplementary_raw.expiration_date = datetime.date.today()
with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum):
digestive_tract.extract(self.supplementary_raw)
Loading

0 comments on commit 9679110

Please sign in to comment.