Skip to content

Commit

Permalink
datastreams: implement asynchronous writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Panero authored and slint committed Jul 16, 2024
1 parent 4d6f563 commit bf9ee3c
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 5 deletions.
25 changes: 25 additions & 0 deletions invenio_vocabularies/datastreams/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Data Streams Celery tasks."""

from celery import shared_task

from ..datastreams import StreamEntry
from ..datastreams.factories import WriterFactory


@shared_task(ignore_result=True)
def write_entry(writer, entry):
"""Write an entry.
:param writer: writer configuration as accepted by the WriterFactory.
:param entry: dictionary, StreamEntry is not serializable.
"""
writer = WriterFactory.create(config=writer)
writer.write(StreamEntry(entry))
19 changes: 19 additions & 0 deletions invenio_vocabularies/datastreams/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .datastreams import StreamEntry
from .errors import WriterError
from .tasks import write_entry


class BaseWriter(ABC):
Expand Down Expand Up @@ -106,3 +107,21 @@ def write(self, stream_entry, *args, **kwargs):
yaml.safe_dump([stream_entry.entry], file, allow_unicode=True)

return stream_entry


class AsyncWriter(BaseWriter):
"""Writes the entries asynchronously (celery task)."""

def __init__(self, writer, *args, **kwargs):
"""Constructor.
:param writer: writer to use.
"""
self._writer = writer
super().__init__(*args, **kwargs)

def write(self, stream_entry, *args, **kwargs):
"""Launches a celery task to write an entry."""
write_entry.delay(self._writer, stream_entry.entry)

return stream_entry
11 changes: 8 additions & 3 deletions tests/datastreams/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import pytest

from invenio_vocabularies.config import VOCABULARIES_DATASTREAM_READERS, \
VOCABULARIES_DATASTREAM_TRANSFORMERS, VOCABULARIES_DATASTREAM_WRITERS
from invenio_vocabularies.datastreams.errors import TransformerError, WriterError
from invenio_vocabularies.datastreams.readers import BaseReader, JsonReader, ZipReader
from invenio_vocabularies.datastreams.transformers import BaseTransformer
Expand Down Expand Up @@ -75,12 +77,15 @@ def write(self, stream_entry, *args, **kwargs):
def app_config(app_config):
"""Mimic an instance's configuration."""
app_config["VOCABULARIES_DATASTREAM_READERS"] = {
"json": JsonReader,
**VOCABULARIES_DATASTREAM_READERS,
"test": TestReader,
"zip": ZipReader,
}
app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = {"test": TestTransformer}
app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = {
**VOCABULARIES_DATASTREAM_TRANSFORMERS,
"test": TestTransformer
}
app_config["VOCABULARIES_DATASTREAM_WRITERS"] = {
**VOCABULARIES_DATASTREAM_WRITERS,
"test": TestWriter,
"fail": FailingTestWriter,
}
Expand Down
34 changes: 34 additions & 0 deletions tests/datastreams/test_datastreams_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Data Streams tasks tests."""

from pathlib import Path

import yaml

from invenio_vocabularies.datastreams import StreamEntry
from invenio_vocabularies.datastreams.tasks import write_entry


def test_write_entry(app):
filepath = 'writer_test.yaml'
yaml_writer_config = {
"type": "yaml",
"args": {
"filepath": filepath
}
}
entry = {"key_one": [{"inner_one": 1}]}
write_entry(yaml_writer_config, entry)

filepath = Path(filepath)
with open(filepath) as file:
assert yaml.safe_load(file) == [entry]

filepath.unlink()
41 changes: 39 additions & 2 deletions tests/datastreams/test_writers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 CERN.
# Copyright (C) 2021-2022 CERN.
#
# Invenio-Vocabularies is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
Expand All @@ -16,7 +16,12 @@

from invenio_vocabularies.datastreams import StreamEntry
from invenio_vocabularies.datastreams.errors import WriterError
from invenio_vocabularies.datastreams.writers import ServiceWriter, YamlWriter
from invenio_vocabularies.datastreams.writers import AsyncWriter, \
ServiceWriter, YamlWriter

##
# Service Writer
##


def test_service_writer_non_existing(lang_type, lang_data, service, identity):
Expand Down Expand Up @@ -67,6 +72,10 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident

assert dict(record, **updated_lang) == record

##
# YAML Writer
##


def test_yaml_writer():
filepath = Path("writer_test.yaml")
Expand All @@ -80,3 +89,31 @@ def test_yaml_writer():
assert yaml.safe_load(file) == test_output

filepath.unlink()

##
# Async Writer
##


def test_async_writer(app):
filepath = 'writer_test.yaml'
yaml_writer_config = {
"type": "yaml",
"args": {
"filepath": filepath
}
}
async_writer = AsyncWriter(yaml_writer_config)

test_output = [
{"key_one": [{"inner_one": 1}]},
{"key_two": [{"inner_two": "two"}]}
]
for output in test_output:
async_writer.write(stream_entry=StreamEntry(output))

filepath = Path(filepath)
with open(filepath) as file:
assert yaml.safe_load(file) == test_output

filepath.unlink()

0 comments on commit bf9ee3c

Please sign in to comment.