Skip to content

Commit

Permalink
datastreams: add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jrcastro2 committed Jul 19, 2024
1 parent 7f5d38e commit 00535e0
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 22 deletions.
21 changes: 19 additions & 2 deletions invenio_vocabularies/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import click
from flask.cli import with_appcontext
from invenio_access.permissions import system_identity
from invenio_logging.structlog import LoggerFactory
from invenio_pidstore.errors import PIDDeletedError, PIDDoesNotExistError

from .datastreams import DataStreamFactory
Expand All @@ -31,22 +32,38 @@ def _process_vocab(config, num_samples=None):
transformers_config=config.get("transformers"),
writers_config=config["writers"],
)

cli_logger = LoggerFactory.get_logger("cli")
cli_logger.info("Starting processing")
success, errored, filtered = 0, 0, 0
left = num_samples or -1
for result in ds.process():
batch_size = config.get("batch_size", 1000)
write_many = config.get("write_many", False)

for result in ds.process(batch_size=batch_size, write_many=write_many):
left = left - 1
if result.filtered:
filtered += 1
cli_logger.info("Filtered", entry=result.entry, operation=result.op_type)
if result.errors:
for err in result.errors:
click.secho(err, fg="red")
cli_logger.error(
"Error",
entry=result.entry,
operation=result.op_type,
errors=result.errors,
)
errored += 1
else:
success += 1
cli_logger.info("Success", entry=result.entry, operation=result.op_type)
if left == 0:
click.secho(f"Number of samples reached {num_samples}", fg="green")
break
cli_logger.info(
"Finished processing", success=success, errored=errored, filtered=filtered
)

return success, errored, filtered


Expand Down
9 changes: 8 additions & 1 deletion invenio_vocabularies/datastreams/datastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

"""Base data stream."""

from invenio_logging.structlog import LoggerFactory

from .errors import ReaderError, TransformerError, WriterError


Expand Down Expand Up @@ -65,16 +67,21 @@ def process_batch(self, batch, write_many=False):
else:
yield from (self.write(entry) for entry in transformed_entries)

def process(self, batch_size=100, write_many=False, *args, **kwargs):
def process(self, batch_size=100, write_many=False, logger=None, *args, **kwargs):
"""Iterates over the entries.
Uses the reader to get the raw entries and transforms them.
It will iterate over the `StreamEntry` objects returned by
the reader, apply the transformations and yield the result of
writing it.
"""
if not logger:
logger = LoggerFactory.get_logger("datastreams")

batch = []
logger.info(
f"Start reading datastream with batch_size={batch_size} and write_many={write_many}"
)
for stream_entry in self.read():
batch.append(stream_entry)
if len(batch) >= batch_size:
Expand Down
24 changes: 19 additions & 5 deletions invenio_vocabularies/datastreams/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,43 @@
"""Data Streams Celery tasks."""

from celery import shared_task
from invenio_logging.structlog import LoggerFactory

from ..datastreams import StreamEntry
from ..datastreams.factories import WriterFactory


@shared_task(ignore_result=True)
@shared_task(ignore_result=True, logger=None)
def write_entry(writer_config, entry):
"""Write an entry.
:param writer: writer configuration as accepted by the WriterFactory.
:param entry: dictionary, StreamEntry is not serializable.
"""
if not logger:
logger = LoggerFactory.get_logger("write_entry")
writer = WriterFactory.create(config=writer_config)
writer.write(StreamEntry(entry))

stream_entry_processed = writer.write(StreamEntry(entry))
if stream_entry_processed.errors:
logger.error("Error writing entry", entry=entry, errors=stream_entry_processed.errors)
else:
logger.info("Entry written", entry=entry)

@shared_task(ignore_result=True)
def write_many_entry(writer_config, entries):
def write_many_entry(writer_config, entries, logger=None):
"""Write many entries.
:param writer: writer configuration as accepted by the WriterFactory.
:param entry: lisf ot dictionaries, StreamEntry is not serializable.
"""
if not logger:
logger = LoggerFactory.get_logger("write_many_entry")
writer = WriterFactory.create(config=writer_config)
stream_entries = [StreamEntry(entry) for entry in entries]
writer.write_many(stream_entries)
stream_entries_processed = writer.write_many(stream_entries)
errored = [entry for entry in stream_entries_processed if entry.errors]
succeeded = len(stream_entries_processed) - len(errored)
logger.info("Entries written", succeeded=succeeded)
if errored:
for entry in errored:
logger.error("Error writing entry", entry=entry.entry, errors=entry.errors)
59 changes: 45 additions & 14 deletions invenio_vocabularies/services/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from celery import shared_task
from flask import current_app
from invenio_logging.structlog import LoggerFactory

from ..datastreams.factories import DataStreamFactory
from ..factories import get_vocabulary_config
Expand All @@ -17,22 +18,52 @@
@shared_task(ignore_result=True)
def process_datastream(stream):
"""Process a datastream from config."""
vc_config = get_vocabulary_config(stream)
config = vc_config.get_config()
try:
stream_logger = LoggerFactory.get_logger("datastreams-" + stream)
stream_logger.info("Starting processing")
vc_config = get_vocabulary_config(stream)
config = vc_config.get_config()

if not config:
raise ValueError("Invalid stream configuration")
if not config:
stream_logger.error("Invalid stream configuration")
raise ValueError("Invalid stream configuration")

ds = DataStreamFactory.create(
readers_config=config["readers"],
transformers_config=config.get("transformers"),
writers_config=config["writers"],
)

for result in ds.process():
if result.errors:
for err in result.errors:
current_app.logger.error(err)
ds = DataStreamFactory.create(
readers_config=config["readers"],
transformers_config=config.get("transformers"),
writers_config=config["writers"],
)
stream_logger.info("Datastream created")
stream_logger.info("Processing Datastream")
success, errored, filtered = 0, 0, 0
for result in ds.process(
batch_size=config.get("batch_size", 100),
write_many=config.get("write_many", False),
logger=stream_logger,
):
if result.filtered:
filtered += 1
stream_logger.info(
"Filtered", entry=result.entry, operation=result.op_type
)
if result.errors:
errored += 1
stream_logger.error(
"Error",
entry=result.entry,
operation=result.op_type,
errors=result.errors,
)
else:
success += 1
stream_logger.info(
"Success", entry=result.entry, operation=result.op_type
)
stream_logger.info(
"Finished processing", success=success, errored=errored, filtered=filtered
)
except Exception as e:
stream_logger.exception("Error processing stream", error=e)


@shared_task()
Expand Down

0 comments on commit 00535e0

Please sign in to comment.