Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #74 from alercebroker/feature/pre_produce_commit
Browse files Browse the repository at this point in the history
feat: now pre_produce commits
  • Loading branch information
pgallardor authored Jul 21, 2023
2 parents e1abda8 + 031c836 commit 3892bab
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 9 deletions.
2 changes: 0 additions & 2 deletions apf/consumers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,7 @@ def _deserialize_message(self, message):


class KafkaSchemalessConsumer(KafkaConsumer):

def __init__(self, config: dict):

schema_path = config.get("SCHEMA_PATH")
if schema_path:
self.schema = fastavro.schema.load_schema(schema_path)
Expand Down
4 changes: 2 additions & 2 deletions apf/core/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ def _post_execute(self, result: Union[Iterable[Dict[str, Any]], Dict[str, Any]])
self.logger.debug("Error at post_execute")
self.logger.debug(f"The result that caused the error: {result}")
raise error
if self.commit:
self.consumer.commit()
self.metrics["timestamp_sent"] = datetime.datetime.now(datetime.timezone.utc)
time_difference = (
self.metrics["timestamp_sent"] - self.metrics["timestamp_received"]
Expand Down Expand Up @@ -266,6 +264,8 @@ def _pre_produce(
self.logger.debug("Error at pre_produce")
self.logger.debug(f"The result that caused the error: {result}")
raise error
if self.commit:
self.consumer.commit()
return message_to_produce

def pre_produce(self, result: Union[Iterable[Dict[str, Any]], Dict[str, Any]]):
Expand Down
2 changes: 1 addition & 1 deletion apf/metrics/pyroscope/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .profile import profile
from .profile import profile
8 changes: 5 additions & 3 deletions apf/metrics/pyroscope/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
import os
import pyroscope


def profile(func):
""" Creates a Pyroscope context for the function to execute """
"""Creates a Pyroscope context for the function to execute"""

@functools.wraps(func)
def pyroscope_context(*args, **kwargs):
if bool(os.getenv("USE_PROFILING")):
with pyroscope.tag_wrapper({ "function": func.__name__ }):
with pyroscope.tag_wrapper({"function": func.__name__}):
func(*args, **kwargs)

else:
func(*args, **kwargs)

return pyroscope_context
return pyroscope_context
2 changes: 1 addition & 1 deletion apf/producers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ def __del__(self):
self.logger.info("Waiting to produce last messages")
self.producer.flush()

class KafkaSchemalessProducer(KafkaProducer):

class KafkaSchemalessProducer(KafkaProducer):
def _serialize_message(self, message):
out = io.BytesIO()
fastavro.schemaless_writer(out, self.schema, message, strict=True)
Expand Down

0 comments on commit 3892bab

Please sign in to comment.