diff --git a/src/component.py b/src/component.py index d8eaea8..cec51c7 100644 --- a/src/component.py +++ b/src/component.py @@ -6,10 +6,14 @@ import logging import os import csv +import json +from collections import OrderedDict from keboola.component.base import ComponentBase, sync_action from keboola.component.sync_actions import SelectElement from keboola.component.exceptions import UserException +from keboola.component.dao import ColumnDefinition, BaseType + from configuration import Configuration from kafka.client import KafkaConsumer @@ -20,8 +24,8 @@ # global constants RESULT_PK = ['topic', 'timestamp_type', 'timestamp', 'partition', 'offset', 'key'] -RESULT_COLS = ['topic', 'timestamp_type', 'timestamp', 'partition', 'offset', 'key', - 'value'] +RESULT_COLS = ['topic', 'timestamp_type', 'timestamp', 'partition', 'offset', 'key', 'value'] +RESULT_COLS_DTYPES = ['string', 'string', 'timestamp', 'int', 'int', 'string', 'string'] class Component(ComponentBase): @@ -59,22 +63,35 @@ def run(self, debug=False): logging.info("Extracting data from topics {0}".format(self.params.topics)) for topic in self.params.topics: - msg_cnt, res_file_folder = self.consume_topic(topic) - self.topics[topic] = {'msg_cnt': msg_cnt, 'res_file_folder': res_file_folder} + msg_cnt, res_file_folder, schema = self.consume_topic(topic) + self.topics[topic] = {'msg_cnt': msg_cnt, 'res_file_folder': res_file_folder, 'schema': schema} # Store previous offsets and columns state_dict = {"prev_offsets": self.latest_offsets, "columns": self.columns} self.write_state_file(state_dict) logging.info("Offset file stored.") + self.produce_manifest() logging.info("Extraction finished.") + def produce_manifest(self): for topic, consumed in self.topics.items(): + + schema = OrderedDict() + for col, dtype in zip(RESULT_COLS, RESULT_COLS_DTYPES): + schema[col] = ColumnDefinition(data_types=self.convert_dtypes(dtype)) + + if consumed.get('schema'): + del schema['value'] + + for col in consumed.get('schema'): + schema[col.get('name')] = ColumnDefinition(data_types=self.convert_dtypes(col.get('type'))) + # Produce final sliced table manifest if consumed['msg_cnt'] > 0: logging.info(F'Fetched {consumed['msg_cnt']} messages from topic - {topic}') out_table = self.create_out_table_definition(consumed['res_file_folder'], is_sliced=True, - primary_key=RESULT_PK, schema=self.columns[topic], + primary_key=RESULT_PK, schema=schema, incremental=True) self.write_manifest(out_table) @@ -85,19 +102,12 @@ def consume_topic(self, topic): self.columns.setdefault(topic, RESULT_COLS) - deserializer = None - if self.params.deserialize == 'avro': - if self.params.schema_registry_url: - config = self.params.schema_registry_extra_params - config['url'] = self.params.schema_registry_url - schema_registry_client = SchemaRegistryClient(config) - deserializer = AvroDeserializer(schema_registry_client) - elif self.params.schema_str: - deserializer = AvroDeserializer(self.params.schema_str) - else: - raise ValueError("Schema Registry URL or schema string must be provided for Avro deserialization.") + deserializer = self.get_deserializer() + res_file_folder = os.path.join(self.tables_out_path, topic) msg_cnt = 0 + last_message = None + dtypes = [] for msg in self.client.consume_message_batch(topic): if msg is None: break @@ -126,6 +136,7 @@ def consume_topic(self, topic): if self.params.flatten_message_value_columns: self.safe_update(extracted_data, value) self.columns[topic] = list(extracted_data.keys()) + last_message = msg.value() # to get dtypes else: extracted_data['value'] = value @@ -147,7 +158,54 @@ def consume_topic(self, topic): self.latest_offsets[msg.topic()]['p' + str(msg.partition())] = msg.offset() - return msg_cnt, res_file_folder + if self.params.deserialize == 'avro' and self.params.flatten_message_value_columns: + dtypes = self.get_topic_dtypes(last_message) + + return msg_cnt, res_file_folder, dtypes + + def get_deserializer(self): + deserializer = None + if self.params.deserialize == 'avro': + if self.params.schema_registry_url: + config = self.params.schema_registry_extra_params + config['url'] = self.params.schema_registry_url + schema_registry_client = SchemaRegistryClient(config) + deserializer = AvroDeserializer(schema_registry_client) + elif self.params.schema_str: + deserializer = AvroDeserializer(self.params.schema_str) + else: + raise ValueError("Schema Registry URL or schema string must be provided for Avro deserialization.") + return deserializer + + def get_topic_dtypes(self, message_value: str): + schema = None + if self.params.deserialize == 'avro': + if self.params.schema_registry_url: + config = self.params.schema_registry_extra_params + config['url'] = self.params.schema_registry_url + schema_registry_client = SchemaRegistryClient(config) + schema_id = int.from_bytes(message_value[1:5]) + schema = json.loads(schema_registry_client.get_schema(schema_id).schema_str).get('fields') + + elif self.params.schema_str: + schema = json.loads(self.params.schema_str).get('fields') + + return schema + + def convert_dtypes(self, dtype: str = 'string'): + match dtype: + case 'boolean': + base_type = BaseType.boolean() + case 'int': + base_type = BaseType.integer() + case 'float': + base_type = BaseType.float() + case 'double': + base_type = BaseType.float() + case _: + base_type = BaseType.string() + + return base_type def _init_client(self, debug, params, prev_offsets, servers): c = KafkaConsumer(servers=servers, diff --git a/tests/functional/01_avro-value/expected/data/out/tables/avro-value.manifest b/tests/functional/01_avro-value/expected/data/out/tables/avro-value.manifest index fec8088..c478ea9 100644 --- a/tests/functional/01_avro-value/expected/data/out/tables/avro-value.manifest +++ b/tests/functional/01_avro-value/expected/data/out/tables/avro-value.manifest @@ -1 +1 @@ -{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]} \ No newline at end of file +{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]} \ No newline at end of file diff --git a/tests/functional/02_text-message/expected/data/out/tables/text-message.manifest b/tests/functional/02_text-message/expected/data/out/tables/text-message.manifest index fec8088..c478ea9 100644 --- a/tests/functional/02_text-message/expected/data/out/tables/text-message.manifest +++ b/tests/functional/02_text-message/expected/data/out/tables/text-message.manifest @@ -1 +1 @@ -{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]} \ No newline at end of file +{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]} \ No newline at end of file diff --git a/tests/functional/_03_avro-columns/expected/data/out/files/.gitkeep b/tests/functional/03_avro-columns/expected/data/out/files/.gitkeep similarity index 100% rename from tests/functional/_03_avro-columns/expected/data/out/files/.gitkeep rename to tests/functional/03_avro-columns/expected/data/out/files/.gitkeep diff --git a/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns.manifest b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns.manifest new file mode 100644 index 0000000..a062bb0 --- /dev/null +++ b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns.manifest @@ -0,0 +1 @@ +{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "col_boolean", "data_type": {"base": {"type": "BOOLEAN"}}, "nullable": true}, {"name": "col_int", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true}, {"name": "col_long", "data_type": {"base": {"type": "STRING"}}, "nullable": true}, {"name": "col_float", "data_type": {"base": {"type": "FLOAT"}}, "nullable": true}, {"name": "col_double", "data_type": {"base": {"type": "FLOAT"}}, "nullable": true}, {"name": "col_bytes", "data_type": {"base": {"type": "STRING"}}, "nullable": true}, {"name": "col_string", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]} \ No newline at end of file diff --git a/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p0-0.csv b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p0-0.csv new file mode 100644 index 0000000..34e54df --- /dev/null +++ b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p0-0.csv @@ -0,0 +1,3 @@ +avro-columns,1,1732104020556,0,0,b'key3',True,3,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,0,1,b'key3',True,7,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,0,2,b'key3',True,11,1234567890,123.44999694824219,123456.7891234568,b'test',Test message diff --git a/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p1-0.csv b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p1-0.csv new file mode 100644 index 0000000..a174904 --- /dev/null +++ b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p1-0.csv @@ -0,0 +1,8 @@ +avro-columns,1,1732104020556,1,0,b'key1',True,1,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,1,b'key2',True,2,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,2,b'key1',True,5,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,3,b'key2',True,6,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,4,b'key1',True,9,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,5,b'key2',True,10,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,6,b'key1',True,13,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,1,7,b'key2',True,14,1234567890,123.44999694824219,123456.7891234568,b'test',Test message diff --git a/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p2-0.csv b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p2-0.csv new file mode 100644 index 0000000..c039c6b --- /dev/null +++ b/tests/functional/03_avro-columns/expected/data/out/tables/avro-columns/p2-0.csv @@ -0,0 +1,4 @@ +avro-columns,1,1732104020556,2,0,b'key0',True,0,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,2,1,b'key0',True,4,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,2,2,b'key0',True,8,1234567890,123.44999694824219,123456.7891234568,b'test',Test message +avro-columns,1,1732104020556,2,3,b'key0',True,12,1234567890,123.44999694824219,123456.7891234568,b'test',Test message diff --git a/tests/functional/_03_avro-columns/source/data/config.json b/tests/functional/03_avro-columns/source/data/config.json similarity index 100% rename from tests/functional/_03_avro-columns/source/data/config.json rename to tests/functional/03_avro-columns/source/data/config.json diff --git a/tests/functional/_03_avro-columns/source/data/in/.gitkeep b/tests/functional/03_avro-columns/source/data/in/.gitkeep similarity index 100% rename from tests/functional/_03_avro-columns/source/data/in/.gitkeep rename to tests/functional/03_avro-columns/source/data/in/.gitkeep diff --git a/tests/functional/_03_avro-columns/source/data/in/files/.gitkeep b/tests/functional/03_avro-columns/source/data/in/files/.gitkeep similarity index 100% rename from tests/functional/_03_avro-columns/source/data/in/files/.gitkeep rename to tests/functional/03_avro-columns/source/data/in/files/.gitkeep diff --git a/tests/functional/_03_avro-columns/source/data/in/tables/.gitkeep b/tests/functional/03_avro-columns/source/data/in/tables/.gitkeep similarity index 100% rename from tests/functional/_03_avro-columns/source/data/in/tables/.gitkeep rename to tests/functional/03_avro-columns/source/data/in/tables/.gitkeep diff --git a/tests/functional/_03_avro-columns/source/data/out/files/.gitkeep b/tests/functional/03_avro-columns/source/data/out/files/.gitkeep similarity index 100% rename from tests/functional/_03_avro-columns/source/data/out/files/.gitkeep rename to tests/functional/03_avro-columns/source/data/out/files/.gitkeep diff --git a/tests/functional/_03_avro-columns/source/data/out/tables/.gitkeep b/tests/functional/03_avro-columns/source/data/out/tables/.gitkeep similarity index 100% rename from tests/functional/_03_avro-columns/source/data/out/tables/.gitkeep rename to tests/functional/03_avro-columns/source/data/out/tables/.gitkeep diff --git a/tests/functional/_03_avro-columns/source/set_up.py b/tests/functional/03_avro-columns/source/set_up.py similarity index 70% rename from tests/functional/_03_avro-columns/source/set_up.py rename to tests/functional/03_avro-columns/source/set_up.py index efa67a0..718abeb 100644 --- a/tests/functional/_03_avro-columns/source/set_up.py +++ b/tests/functional/03_avro-columns/source/set_up.py @@ -32,16 +32,21 @@ def run(context: TestDataDir): "type": "record", "name": "User", "fields": [ - {"name": "id", "type": "int"}, - {"name": "key", "type": "string"}, - {"name": "text", "type": "string"} + {"name": "col_boolean", "type": "boolean"}, + {"name": "col_int", "type": "int"}, + {"name": "col_long", "type": "long"}, + {"name": "col_float", "type": "float"}, + {"name": "col_double", "type": "double"}, + {"name": "col_bytes", "type": "bytes"}, + {"name": "col_string", "type": "string"} ] }''' avro_serializer = AvroSerializer(schema_reg_client, schema_str) value = avro_serializer( - {"id": i, "key": f"{name}-111", "text": f"Test message {i}"}, + {"col_boolean": True, "col_int": i, "col_long": 1234567890, "col_float": 123.45, + "col_double": 123456.7891234568, "col_bytes": b"test", "col_string": "Test message"}, SerializationContext(name, MessageField.VALUE) ) diff --git a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns.manifest b/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns.manifest deleted file mode 100644 index 190a220..0000000 --- a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns.manifest +++ /dev/null @@ -1 +0,0 @@ -{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "id", "data_type": {"base": {"type": "STRING"}}, "nullable": true}, {"name": "value_key", "data_type": {"base": {"type": "STRING"}}, "nullable": true}, {"name": "text", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]} \ No newline at end of file diff --git a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p0-0.csv b/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p0-0.csv deleted file mode 100644 index ef982cc..0000000 --- a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p0-0.csv +++ /dev/null @@ -1,3 +0,0 @@ -avro-columns,1,1732104020556,0,0,b'key3',3,avro-columns-111,Test message 3 -avro-columns,1,1732104020556,0,1,b'key3',7,avro-columns-111,Test message 7 -avro-columns,1,1732104020556,0,2,b'key3',11,avro-columns-111,Test message 11 diff --git a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p1-0.csv b/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p1-0.csv deleted file mode 100644 index e0f8010..0000000 --- a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p1-0.csv +++ /dev/null @@ -1,8 +0,0 @@ -avro-columns,1,1732104020556,1,0,b'key1',1,avro-columns-111,Test message 1 -avro-columns,1,1732104020556,1,1,b'key2',2,avro-columns-111,Test message 2 -avro-columns,1,1732104020556,1,2,b'key1',5,avro-columns-111,Test message 5 -avro-columns,1,1732104020556,1,3,b'key2',6,avro-columns-111,Test message 6 -avro-columns,1,1732104020556,1,4,b'key1',9,avro-columns-111,Test message 9 -avro-columns,1,1732104020556,1,5,b'key2',10,avro-columns-111,Test message 10 -avro-columns,1,1732104020556,1,6,b'key1',13,avro-columns-111,Test message 13 -avro-columns,1,1732104020556,1,7,b'key2',14,avro-columns-111,Test message 14 diff --git a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p2-0.csv b/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p2-0.csv deleted file mode 100644 index e5c349a..0000000 --- a/tests/functional/_03_avro-columns/expected/data/out/tables/avro-columns/p2-0.csv +++ /dev/null @@ -1,4 +0,0 @@ -avro-columns,1,1732104020556,2,0,b'key0',0,avro-columns-111,Test message 0 -avro-columns,1,1732104020556,2,1,b'key0',4,avro-columns-111,Test message 4 -avro-columns,1,1732104020556,2,2,b'key0',8,avro-columns-111,Test message 8 -avro-columns,1,1732104020556,2,3,b'key0',12,avro-columns-111,Test message 12