Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DLQ aggregation recipe #1368

Open
davetroiano opened this issue Sep 8, 2022 · 2 comments
Open

DLQ aggregation recipe #1368

davetroiano opened this issue Sep 8, 2022 · 2 comments
Labels
recipe use case A tutorial with an extended business use case

Comments

@davetroiano
Copy link
Contributor

Use case: aggregate DLQ topics w/ ksqlDB and send downstream (e.g., to BigQuery)

Recipe would, e.g.:

  1. create 2 connectors w/ DLQ topics, simulate the DLQ topics via INSERT
  2. KSQL to aggregate (transform / normalize?) the streams into one
  3. send downstream
@davetroiano davetroiano added recipe use case A tutorial with an extended business use case labels Sep 8, 2022
@chuck-confluent
Copy link

  1. Create a stream against a DLQ topic
    CREATE STREAM dlq_bigquery_1 (
    key_col VARCHAR KEY,
    value_col VARCHAR,
    err_msg ARRAY<STRUCT<key STRING, value BYTES>> HEADERS
    ) WITH (
    WRAP_SINGLE_VALUE=false,
    kafka_topic = 'dlq-lcc-gqx2qn',
    value_format = 'json'
    );
  2. Create another stream to hold DLQ messages from many other DLQ topics:
    CREATE STREAM all_dlq (key_col VARCHAR KEY, value_col VARCHAR, err_msg ARRAY<STRUCT<key STRING, value STRING>>)
        WITH (kafka_topic='all_dlq', partitions=1, value_format='json');
  3. Insert each DLQ topics to the all_dlq stream:
    INSERT INTO all_dlq SELECT key_col, value_col, TRANSFORM(err_msg, x => Struct(key := x->key, value := FROM_BYTES(x->value, 'base64'))) as err_msg FROM dlq_bigquery_1;
    
  4. Sink to bigquery
    CREATE SINK CONNECTOR ...

@hendrasutanto
Copy link
Member

A sample DLQ input record produced by the S3 sink connector:

Value of the record:
{ "ordertime": 1497014222380, "orderid": 18, "itemid": "Item_184", "address": { "city": "Mountain View", "state": "CA", "zipcode": 94041 } }

Header of the record:
[ { "key": "__connect.errors.topic", "stringValue": "pksqlc-okr9jACCOMPLISHED_FEMALE_READERS" }, { "key": "__connect.errors.partition", "stringValue": "0" }, { "key": "__connect.errors.offset", "stringValue": "4957217" }, { "key": "__connect.errors.connector.name", "stringValue": "lcc-gqx2qn" }, { "key": "__connect.errors.task.id", "stringValue": "0" }, { "key": "__connect.errors.stage", "stringValue": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "stringValue": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "stringValue": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "stringValue": "Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: " }, { "key": "__connect.errors.exception.stacktrace", "stringValue": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic pksqlc-okr9jACCOMPLISHED_FEMALE_READERS: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:176)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:231)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:111)\n\t... 20 more\n" } ]

Key of the record:
18

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
recipe use case A tutorial with an extended business use case
Projects
None yet
Development

No branches or pull requests

3 participants