From bf9ee3cefb4c07c698079ef00ea4f285220de1c9 Mon Sep 17 00:00:00 2001 From: Pablo Panero Date: Fri, 21 Jan 2022 14:28:19 +0100 Subject: [PATCH] datastreams: implement asynchronous writer --- invenio_vocabularies/datastreams/tasks.py | 25 +++++++++++++ invenio_vocabularies/datastreams/writers.py | 19 ++++++++++ tests/datastreams/conftest.py | 11 ++++-- tests/datastreams/test_datastreams_tasks.py | 34 +++++++++++++++++ tests/datastreams/test_writers.py | 41 ++++++++++++++++++++- 5 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 invenio_vocabularies/datastreams/tasks.py create mode 100644 tests/datastreams/test_datastreams_tasks.py diff --git a/invenio_vocabularies/datastreams/tasks.py b/invenio_vocabularies/datastreams/tasks.py new file mode 100644 index 00000000..9407c051 --- /dev/null +++ b/invenio_vocabularies/datastreams/tasks.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Data Streams Celery tasks.""" + +from celery import shared_task + +from ..datastreams import StreamEntry +from ..datastreams.factories import WriterFactory + + +@shared_task(ignore_result=True) +def write_entry(writer, entry): + """Write an entry. + + :param writer: writer configuration as accepted by the WriterFactory. + :param entry: dictionary, StreamEntry is not serializable. + """ + writer = WriterFactory.create(config=writer) + writer.write(StreamEntry(entry)) diff --git a/invenio_vocabularies/datastreams/writers.py b/invenio_vocabularies/datastreams/writers.py index d33c7763..abb63dca 100644 --- a/invenio_vocabularies/datastreams/writers.py +++ b/invenio_vocabularies/datastreams/writers.py @@ -20,6 +20,7 @@ from .datastreams import StreamEntry from .errors import WriterError +from .tasks import write_entry class BaseWriter(ABC): @@ -106,3 +107,21 @@ def write(self, stream_entry, *args, **kwargs): yaml.safe_dump([stream_entry.entry], file, allow_unicode=True) return stream_entry + + +class AsyncWriter(BaseWriter): + """Writes the entries asynchronously (celery task).""" + + def __init__(self, writer, *args, **kwargs): + """Constructor. + + :param writer: writer to use. + """ + self._writer = writer + super().__init__(*args, **kwargs) + + def write(self, stream_entry, *args, **kwargs): + """Launches a celery task to write an entry.""" + write_entry.delay(self._writer, stream_entry.entry) + + return stream_entry diff --git a/tests/datastreams/conftest.py b/tests/datastreams/conftest.py index f9c01b98..9c31960c 100644 --- a/tests/datastreams/conftest.py +++ b/tests/datastreams/conftest.py @@ -18,6 +18,8 @@ import pytest +from invenio_vocabularies.config import VOCABULARIES_DATASTREAM_READERS, \ + VOCABULARIES_DATASTREAM_TRANSFORMERS, VOCABULARIES_DATASTREAM_WRITERS from invenio_vocabularies.datastreams.errors import TransformerError, WriterError from invenio_vocabularies.datastreams.readers import BaseReader, JsonReader, ZipReader from invenio_vocabularies.datastreams.transformers import BaseTransformer @@ -75,12 +77,15 @@ def write(self, stream_entry, *args, **kwargs): def app_config(app_config): """Mimic an instance's configuration.""" app_config["VOCABULARIES_DATASTREAM_READERS"] = { - "json": JsonReader, + **VOCABULARIES_DATASTREAM_READERS, "test": TestReader, - "zip": ZipReader, } - app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = {"test": TestTransformer} + app_config["VOCABULARIES_DATASTREAM_TRANSFORMERS"] = { + **VOCABULARIES_DATASTREAM_TRANSFORMERS, + "test": TestTransformer + } app_config["VOCABULARIES_DATASTREAM_WRITERS"] = { + **VOCABULARIES_DATASTREAM_WRITERS, "test": TestWriter, "fail": FailingTestWriter, } diff --git a/tests/datastreams/test_datastreams_tasks.py b/tests/datastreams/test_datastreams_tasks.py new file mode 100644 index 00000000..d907422f --- /dev/null +++ b/tests/datastreams/test_datastreams_tasks.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021-2022 CERN. +# +# Invenio-Vocabularies is free software; you can redistribute it and/or +# modify it under the terms of the MIT License; see LICENSE file for more +# details. + +"""Data Streams tasks tests.""" + +from pathlib import Path + +import yaml + +from invenio_vocabularies.datastreams import StreamEntry +from invenio_vocabularies.datastreams.tasks import write_entry + + +def test_write_entry(app): + filepath = 'writer_test.yaml' + yaml_writer_config = { + "type": "yaml", + "args": { + "filepath": filepath + } + } + entry = {"key_one": [{"inner_one": 1}]} + write_entry(yaml_writer_config, entry) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == [entry] + + filepath.unlink() diff --git a/tests/datastreams/test_writers.py b/tests/datastreams/test_writers.py index ec9cafc8..d25d2c2f 100644 --- a/tests/datastreams/test_writers.py +++ b/tests/datastreams/test_writers.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2021 CERN. +# Copyright (C) 2021-2022 CERN. # # Invenio-Vocabularies is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -16,7 +16,12 @@ from invenio_vocabularies.datastreams import StreamEntry from invenio_vocabularies.datastreams.errors import WriterError -from invenio_vocabularies.datastreams.writers import ServiceWriter, YamlWriter +from invenio_vocabularies.datastreams.writers import AsyncWriter, \ + ServiceWriter, YamlWriter + +## +# Service Writer +## def test_service_writer_non_existing(lang_type, lang_data, service, identity): @@ -67,6 +72,10 @@ def test_service_writer_update_non_existing(lang_type, lang_data, service, ident assert dict(record, **updated_lang) == record +## +# YAML Writer +## + def test_yaml_writer(): filepath = Path("writer_test.yaml") @@ -80,3 +89,31 @@ def test_yaml_writer(): assert yaml.safe_load(file) == test_output filepath.unlink() + +## +# Async Writer +## + + +def test_async_writer(app): + filepath = 'writer_test.yaml' + yaml_writer_config = { + "type": "yaml", + "args": { + "filepath": filepath + } + } + async_writer = AsyncWriter(yaml_writer_config) + + test_output = [ + {"key_one": [{"inner_one": 1}]}, + {"key_two": [{"inner_two": "two"}]} + ] + for output in test_output: + async_writer.write(stream_entry=StreamEntry(output)) + + filepath = Path(filepath) + with open(filepath) as file: + assert yaml.safe_load(file) == test_output + + filepath.unlink()