From 031c83663030484a1e974593ec40b593c023f5c3 Mon Sep 17 00:00:00 2001 From: Pedro Gallardo Date: Fri, 21 Jul 2023 10:52:10 -0400 Subject: [PATCH] feat: now pre_produce commits --- apf/consumers/kafka.py | 2 -- apf/core/step.py | 4 ++-- apf/metrics/pyroscope/__init__.py | 2 +- apf/metrics/pyroscope/profile.py | 8 +++++--- apf/producers/kafka.py | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apf/consumers/kafka.py b/apf/consumers/kafka.py index 2b0beec..9d00aca 100644 --- a/apf/consumers/kafka.py +++ b/apf/consumers/kafka.py @@ -272,9 +272,7 @@ def _deserialize_message(self, message): class KafkaSchemalessConsumer(KafkaConsumer): - def __init__(self, config: dict): - schema_path = config.get("SCHEMA_PATH") if schema_path: self.schema = fastavro.schema.load_schema(schema_path) diff --git a/apf/core/step.py b/apf/core/step.py index 29eaff3..68193f3 100644 --- a/apf/core/step.py +++ b/apf/core/step.py @@ -228,8 +228,6 @@ def _post_execute(self, result: Union[Iterable[Dict[str, Any]], Dict[str, Any]]) self.logger.debug("Error at post_execute") self.logger.debug(f"The result that caused the error: {result}") raise error - if self.commit: - self.consumer.commit() self.metrics["timestamp_sent"] = datetime.datetime.now(datetime.timezone.utc) time_difference = ( self.metrics["timestamp_sent"] - self.metrics["timestamp_received"] @@ -266,6 +264,8 @@ def _pre_produce( self.logger.debug("Error at pre_produce") self.logger.debug(f"The result that caused the error: {result}") raise error + if self.commit: + self.consumer.commit() return message_to_produce def pre_produce(self, result: Union[Iterable[Dict[str, Any]], Dict[str, Any]]): diff --git a/apf/metrics/pyroscope/__init__.py b/apf/metrics/pyroscope/__init__.py index a25bbfc..c0c090a 100644 --- a/apf/metrics/pyroscope/__init__.py +++ b/apf/metrics/pyroscope/__init__.py @@ -1 +1 @@ -from .profile import profile \ No newline at end of file +from .profile import profile diff --git a/apf/metrics/pyroscope/profile.py b/apf/metrics/pyroscope/profile.py index 27ef7f1..fb3e338 100644 --- a/apf/metrics/pyroscope/profile.py +++ b/apf/metrics/pyroscope/profile.py @@ -2,15 +2,17 @@ import os import pyroscope + def profile(func): - """ Creates a Pyroscope context for the function to execute """ + """Creates a Pyroscope context for the function to execute""" + @functools.wraps(func) def pyroscope_context(*args, **kwargs): if bool(os.getenv("USE_PROFILING")): - with pyroscope.tag_wrapper({ "function": func.__name__ }): + with pyroscope.tag_wrapper({"function": func.__name__}): func(*args, **kwargs) else: func(*args, **kwargs) - return pyroscope_context \ No newline at end of file + return pyroscope_context diff --git a/apf/producers/kafka.py b/apf/producers/kafka.py index 5f42cf9..f570784 100644 --- a/apf/producers/kafka.py +++ b/apf/producers/kafka.py @@ -165,8 +165,8 @@ def __del__(self): self.logger.info("Waiting to produce last messages") self.producer.flush() -class KafkaSchemalessProducer(KafkaProducer): +class KafkaSchemalessProducer(KafkaProducer): def _serialize_message(self, message): out = io.BytesIO() fastavro.schemaless_writer(out, self.schema, message, strict=True)