Skip to content

Commit

Permalink
native types support
Browse files Browse the repository at this point in the history
  • Loading branch information
kudj committed Nov 28, 2024
1 parent 67462dc commit ee19d02
Show file tree
Hide file tree
Showing 19 changed files with 102 additions and 39 deletions.
92 changes: 75 additions & 17 deletions src/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import logging
import os
import csv
import json
from collections import OrderedDict

from keboola.component.base import ComponentBase, sync_action
from keboola.component.sync_actions import SelectElement
from keboola.component.exceptions import UserException
from keboola.component.dao import ColumnDefinition, BaseType

from configuration import Configuration

from kafka.client import KafkaConsumer
Expand All @@ -20,8 +24,8 @@

# global constants
RESULT_PK = ['topic', 'timestamp_type', 'timestamp', 'partition', 'offset', 'key']
RESULT_COLS = ['topic', 'timestamp_type', 'timestamp', 'partition', 'offset', 'key',
'value']
RESULT_COLS = ['topic', 'timestamp_type', 'timestamp', 'partition', 'offset', 'key', 'value']
RESULT_COLS_DTYPES = ['string', 'string', 'timestamp', 'int', 'int', 'string', 'string']


class Component(ComponentBase):
Expand Down Expand Up @@ -59,22 +63,35 @@ def run(self, debug=False):
logging.info("Extracting data from topics {0}".format(self.params.topics))

for topic in self.params.topics:
msg_cnt, res_file_folder = self.consume_topic(topic)
self.topics[topic] = {'msg_cnt': msg_cnt, 'res_file_folder': res_file_folder}
msg_cnt, res_file_folder, schema = self.consume_topic(topic)
self.topics[topic] = {'msg_cnt': msg_cnt, 'res_file_folder': res_file_folder, 'schema': schema}

# Store previous offsets and columns
state_dict = {"prev_offsets": self.latest_offsets, "columns": self.columns}
self.write_state_file(state_dict)
logging.info("Offset file stored.")

self.produce_manifest()
logging.info("Extraction finished.")

def produce_manifest(self):
for topic, consumed in self.topics.items():

schema = OrderedDict()
for col, dtype in zip(RESULT_COLS, RESULT_COLS_DTYPES):
schema[col] = ColumnDefinition(data_types=self.convert_dtypes(dtype))

if consumed.get('schema'):
del schema['value']

for col in consumed.get('schema'):
schema[col.get('name')] = ColumnDefinition(data_types=self.convert_dtypes(col.get('type')))

# Produce final sliced table manifest
if consumed['msg_cnt'] > 0:
logging.info(F'Fetched {consumed['msg_cnt']} messages from topic - {topic}')
out_table = self.create_out_table_definition(consumed['res_file_folder'], is_sliced=True,
primary_key=RESULT_PK, schema=self.columns[topic],
primary_key=RESULT_PK, schema=schema,
incremental=True)

self.write_manifest(out_table)
Expand All @@ -85,19 +102,12 @@ def consume_topic(self, topic):

self.columns.setdefault(topic, RESULT_COLS)

deserializer = None
if self.params.deserialize == 'avro':
if self.params.schema_registry_url:
config = self.params.schema_registry_extra_params
config['url'] = self.params.schema_registry_url
schema_registry_client = SchemaRegistryClient(config)
deserializer = AvroDeserializer(schema_registry_client)
elif self.params.schema_str:
deserializer = AvroDeserializer(self.params.schema_str)
else:
raise ValueError("Schema Registry URL or schema string must be provided for Avro deserialization.")
deserializer = self.get_deserializer()

res_file_folder = os.path.join(self.tables_out_path, topic)
msg_cnt = 0
last_message = None
dtypes = []
for msg in self.client.consume_message_batch(topic):
if msg is None:
break
Expand Down Expand Up @@ -126,6 +136,7 @@ def consume_topic(self, topic):
if self.params.flatten_message_value_columns:
self.safe_update(extracted_data, value)
self.columns[topic] = list(extracted_data.keys())
last_message = msg.value() # to get dtypes
else:
extracted_data['value'] = value

Expand All @@ -147,7 +158,54 @@ def consume_topic(self, topic):

self.latest_offsets[msg.topic()]['p' + str(msg.partition())] = msg.offset()

return msg_cnt, res_file_folder
if self.params.deserialize == 'avro' and self.params.flatten_message_value_columns:
dtypes = self.get_topic_dtypes(last_message)

return msg_cnt, res_file_folder, dtypes

def get_deserializer(self):
deserializer = None
if self.params.deserialize == 'avro':
if self.params.schema_registry_url:
config = self.params.schema_registry_extra_params
config['url'] = self.params.schema_registry_url
schema_registry_client = SchemaRegistryClient(config)
deserializer = AvroDeserializer(schema_registry_client)
elif self.params.schema_str:
deserializer = AvroDeserializer(self.params.schema_str)
else:
raise ValueError("Schema Registry URL or schema string must be provided for Avro deserialization.")
return deserializer

def get_topic_dtypes(self, message_value: str):
schema = None
if self.params.deserialize == 'avro':
if self.params.schema_registry_url:
config = self.params.schema_registry_extra_params
config['url'] = self.params.schema_registry_url
schema_registry_client = SchemaRegistryClient(config)
schema_id = int.from_bytes(message_value[1:5])
schema = json.loads(schema_registry_client.get_schema(schema_id).schema_str).get('fields')

elif self.params.schema_str:
schema = json.loads(self.params.schema_str).get('fields')

return schema

def convert_dtypes(self, dtype: str = 'string'):
match dtype:
case 'boolean':
base_type = BaseType.boolean()
case 'int':
base_type = BaseType.integer()
case 'float':
base_type = BaseType.float()
case 'double':
base_type = BaseType.float()
case _:
base_type = BaseType.string()

return base_type

def _init_client(self, debug, params, prev_offsets, servers):
c = KafkaConsumer(servers=servers,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]}
{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]}
{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "value", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"incremental": true, "write_always": false, "delimiter": ",", "enclosure": "\"", "manifest_type": "out", "has_header": false, "schema": [{"name": "topic", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp_type", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "timestamp", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "partition", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "offset", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true, "primary_key": true}, {"name": "key", "data_type": {"base": {"type": "STRING"}}, "nullable": true, "primary_key": true}, {"name": "col_boolean", "data_type": {"base": {"type": "BOOLEAN"}}, "nullable": true}, {"name": "col_int", "data_type": {"base": {"type": "INTEGER"}}, "nullable": true}, {"name": "col_long", "data_type": {"base": {"type": "STRING"}}, "nullable": true}, {"name": "col_float", "data_type": {"base": {"type": "FLOAT"}}, "nullable": true}, {"name": "col_double", "data_type": {"base": {"type": "FLOAT"}}, "nullable": true}, {"name": "col_bytes", "data_type": {"base": {"type": "STRING"}}, "nullable": true}, {"name": "col_string", "data_type": {"base": {"type": "STRING"}}, "nullable": true}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
avro-columns,1,1732104020556,0,0,b'key3',True,3,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,0,1,b'key3',True,7,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,0,2,b'key3',True,11,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
avro-columns,1,1732104020556,1,0,b'key1',True,1,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,1,b'key2',True,2,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,2,b'key1',True,5,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,3,b'key2',True,6,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,4,b'key1',True,9,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,5,b'key2',True,10,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,6,b'key1',True,13,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,1,7,b'key2',True,14,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
avro-columns,1,1732104020556,2,0,b'key0',True,0,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,2,1,b'key0',True,4,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,2,2,b'key0',True,8,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
avro-columns,1,1732104020556,2,3,b'key0',True,12,1234567890,123.44999694824219,123456.7891234568,b'test',Test message
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,21 @@ def run(context: TestDataDir):
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "key", "type": "string"},
{"name": "text", "type": "string"}
{"name": "col_boolean", "type": "boolean"},
{"name": "col_int", "type": "int"},
{"name": "col_long", "type": "long"},
{"name": "col_float", "type": "float"},
{"name": "col_double", "type": "double"},
{"name": "col_bytes", "type": "bytes"},
{"name": "col_string", "type": "string"}
]
}'''

avro_serializer = AvroSerializer(schema_reg_client, schema_str)

value = avro_serializer(
{"id": i, "key": f"{name}-111", "text": f"Test message {i}"},
{"col_boolean": True, "col_int": i, "col_long": 1234567890, "col_float": 123.45,
"col_double": 123456.7891234568, "col_bytes": b"test", "col_string": "Test message"},
SerializationContext(name, MessageField.VALUE)
)

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit ee19d02

Please sign in to comment.