From c602cb82a0e35fac1509bd40fa3edd7f62fd5cf2 Mon Sep 17 00:00:00 2001 From: Quentin Date: Mon, 9 Sep 2024 17:51:19 +0200 Subject: [PATCH] Implement CSVSource (#490) Implement CSVSource A base CSV source that reads data from a single CSV file. --- quixstreams/models/topics/topic.py | 2 +- quixstreams/sinks/csv.py | 4 +- quixstreams/sources/__init__.py | 1 + quixstreams/sources/csv.py | 82 +++++++++++++++++ .../test_models/test_topics/test_topics.py | 5 ++ .../test_quixstreams/test_sources/test_csv.py | 87 +++++++++++++++++++ 6 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 quixstreams/sources/csv.py create mode 100644 tests/test_quixstreams/test_sources/test_csv.py diff --git a/quixstreams/models/topics/topic.py b/quixstreams/models/topics/topic.py index c19475c04..cf7a12d86 100644 --- a/quixstreams/models/topics/topic.py +++ b/quixstreams/models/topics/topic.py @@ -313,4 +313,4 @@ def deserialize(self, message: ConfluentKafkaMessageProto): ) def __repr__(self): - return f'<{self.__class__.__name__} name="{self._name}"> ' + return f'<{self.__class__.__name__} name="{self.name}">' diff --git a/quixstreams/sinks/csv.py b/quixstreams/sinks/csv.py index ad60c1092..c1bac1981 100644 --- a/quixstreams/sinks/csv.py +++ b/quixstreams/sinks/csv.py @@ -27,9 +27,9 @@ def __init__( See the ["csv" module docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more info. Default - `"excel"`. :param key_serializer: a callable to convert keys to strings. - Default - `str()`. + Default - `str`. :param value_serializer: a callable to convert values to strings. - Default - `json.dumps()`. + Default - `json.dumps`. """ super().__init__() self.path = path diff --git a/quixstreams/sources/__init__.py b/quixstreams/sources/__init__.py index 16ff03a97..9389e4dc2 100644 --- a/quixstreams/sources/__init__.py +++ b/quixstreams/sources/__init__.py @@ -1,3 +1,4 @@ from .base import * from .manager import SourceException from .multiprocessing import multiprocessing +from .csv import CSVSource diff --git a/quixstreams/sources/csv.py b/quixstreams/sources/csv.py new file mode 100644 index 000000000..ba337f6b7 --- /dev/null +++ b/quixstreams/sources/csv.py @@ -0,0 +1,82 @@ +import csv +import json + +from typing import Optional, Callable, Any + +from quixstreams.models.topics import Topic + +from .base import Source + + +class CSVSource(Source): + def __init__( + self, + path: str, + dialect: str = "excel", + name: Optional[str] = None, + shutdown_timeout: float = 10, + key_deserializer: Callable[[Any], str] = str, + value_deserializer: Callable[[Any], str] = json.loads, + ) -> None: + """ + A base CSV source that reads data from a single CSV file. + Best used with :class:`quixstreams.sinks.csv.CSVSink`. + + Required columns: key, value + Optional columns: timestamp + + :param path: path to the CSV file + :param dialect: a CSV dialect to use. It affects quoting and delimiters. + See the ["csv" module docs](https://docs.python.org/3/library/csv.html#csv-fmt-params) for more info. + Default - `"excel"`. + :param key_deseralizer: a callable to convert strings to key. + Default - `str` + :param value_deserializer: a callable to convert strings to value. + Default - `json.loads` + """ + super().__init__(name or path, shutdown_timeout) + self.path = path + self.dialect = dialect + + self._key_deserializer = key_deserializer + self._value_deserializer = value_deserializer + + def run(self): + key_deserializer = self._key_deserializer + value_deserializer = self._value_deserializer + + with open(self.path, "r") as f: + reader = csv.DictReader(f, dialect=self.dialect) + + while self.running: + try: + item = next(reader) + except StopIteration: + return + + # if a timestamp column exist with no value timestamp is "" + timestamp = item.get("timestamp") or None + if timestamp is not None: + timestamp = int(timestamp) + + msg = self.serialize( + key=key_deserializer(item["key"]), + value=value_deserializer(item["value"]), + timestamp_ms=timestamp, + ) + + self.produce( + key=msg.key, + value=msg.value, + timestamp=msg.timestamp, + headers=msg.headers, + ) + + def default_topic(self) -> Topic: + return Topic( + name=self.name, + key_serializer="string", + key_deserializer="string", + value_deserializer="json", + value_serializer="json", + ) diff --git a/tests/test_quixstreams/test_models/test_topics/test_topics.py b/tests/test_quixstreams/test_models/test_topics/test_topics.py index d3e165e80..a1bd7204d 100644 --- a/tests/test_quixstreams/test_models/test_topics/test_topics.py +++ b/tests/test_quixstreams/test_models/test_topics/test_topics.py @@ -44,6 +44,11 @@ def __call__(self, value: bytes, ctx: SerializationContext): class TestTopic: + + def test_repr(self, topic_manager_topic_factory): + topic = topic_manager_topic_factory(name="foo") + assert str(topic) == '' + @pytest.mark.parametrize( "key_deserializer, value_deserializer, key, value, expected_key, expected_value", [ diff --git a/tests/test_quixstreams/test_sources/test_csv.py b/tests/test_quixstreams/test_sources/test_csv.py new file mode 100644 index 000000000..b19f675f8 --- /dev/null +++ b/tests/test_quixstreams/test_sources/test_csv.py @@ -0,0 +1,87 @@ +import csv +import json +import pytest + +from unittest.mock import MagicMock + +from quixstreams.sources import CSVSource +from quixstreams.rowproducer import RowProducer + + +class TestCSVSource: + + @pytest.fixture + def producer(self): + producer = MagicMock(spec=RowProducer) + producer.flush.return_value = 0 + return producer + + def test_read(self, tmp_path, producer): + path = tmp_path / "source.csv" + with open(path, "w") as f: + writer = csv.DictWriter( + f, dialect="excel", fieldnames=("key", "value", "timestamp") + ) + writer.writeheader() + writer.writerows( + [ + {"key": "key1", "value": json.dumps({"value": "value1"})}, + {"key": "key2", "value": json.dumps({"value": "value2"})}, + {"key": "key3", "value": json.dumps({"value": "value3"})}, + {"key": "key4", "value": json.dumps({"value": "value4"})}, + { + "key": "key5", + "value": json.dumps({"value": "value5"}), + "timestamp": 10000, + }, + ] + ) + + source = CSVSource(path) + source.configure(source.default_topic(), producer) + source.start() + + assert producer.produce.called + assert producer.produce.call_count == 5 + assert producer.produce.call_args.kwargs == { + "buffer_error_max_tries": 3, + "headers": None, + "key": b"key5", + "partition": None, + "poll_timeout": 5.0, + "timestamp": 10000, + "topic": path, + "value": b'{"value":"value5"}', + } + + def test_read_no_timestamp(self, tmp_path, producer): + path = tmp_path / "source.csv" + with open(path, "w") as f: + writer = csv.DictWriter(f, dialect="excel", fieldnames=("key", "value")) + writer.writeheader() + writer.writerows( + [ + {"key": "key1", "value": json.dumps({"value": "value1"})}, + {"key": "key2", "value": json.dumps({"value": "value2"})}, + {"key": "key3", "value": json.dumps({"value": "value3"})}, + {"key": "key4", "value": json.dumps({"value": "value4"})}, + {"key": "key5", "value": json.dumps({"value": "value5"})}, + ] + ) + + source = CSVSource(path) + source.configure(source.default_topic(), producer) + source.start() + + assert producer.produce.called + assert producer.produce.call_count == 5 + assert producer.produce.call_args.kwargs == { + "buffer_error_max_tries": 3, + "headers": None, + "key": b"key5", + "partition": None, + "poll_timeout": 5.0, + "timestamp": None, + "topic": path, + "value": b'{"value":"value5"}', + }