Skip to content

Commit

Permalink
Add concurrent processing (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro authored Dec 11, 2024
1 parent ade6b6e commit ae978a6
Show file tree
Hide file tree
Showing 114 changed files with 8,703 additions and 3,900 deletions.
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ Example of configuration with the use of a ticket cache:

#### Quick Start Confluent Cloud Example

Check out the [adapters.xml](/examples/vendors/confluent/quickstart-confluent-cloud/adapters.xml#L22) file of the [_Quick Start Confluent Cloud_](/examples/vendors/confluent/quickstart-confluent-cloud/) app, where you can find an example of an authentication configuration that uses SASL/PLAIN.
Check out the [adapters.xml](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L454) file of the [_Quick Start Confluent Cloud_](/examples/vendors/confluent/quickstart-confluent/) app, where you can find an example of an authentication configuration that uses SASL/PLAIN.

#### Quick Start Redpanda Serverless Example

Expand Down Expand Up @@ -870,6 +870,42 @@ Example:
<param name="record.consume.from">EARLIEST</param>
```

#### `record.consume.with.num.threads`

_Optional_. The number of threads to be used for concurrent processing of the incoming deserialized records. If set to `-1`, the number of threads will be automatically determined based on the number of available CPU cores.

Default value: `1`.

Example:

```xml
<param name="record.consume.with.num.threads">4</param>
```

#### `record.consume.with.order.strategy`

_Optional but only effective if [`record.consume.with.num.threads`](#recordconsumewithnumthreads) is set to a value greater than `1` (which includes hte default value)_. The order strategy to be used for concurrent processing of the incoming deserialized records. Can be one of the following:

- `ORDER_BY_PARTITION`: maintain the order of records within each partition.

If you have multiple partitions, records from different partitions can be processed concurrently by different threads, but the order of records from a single partition will always be preserved. This is the default and generally a good balance between performance and order.

- `ORDER_BY_KEY`: maintain the order among the records sharing the same key.

Different keys can be processed concurrently by different threads. So, while all records with key "A" are processed in order, and all records with key "B" are processed in order, the processing of "A" and "B" records can happen concurrently and interleaved in time. There's no guaranteed order between records of different keys.

- `UNORDERED`: provide no ordering guarantees.

Records from any partition and with any key can be processed by any thread at any time. This offers the highest throughput when an high number of subscriptions is involved, but the order in which records are delivered to Lightstreamer clients might not match the order they were written to Kafka. This is suitable for use cases where message order is not important.

Default value: `ORDER_BY_PARTITION`.

Example:

```xml
<param name="record.consume.with.order.strategyy">ORDER_BY_KEY</param>
```

#### `record.key.evaluator.type` and `record.value.evaluator.type`

_Optional_. The format to be used to deserialize respectively the key and value of a Kafka record. Can be one of the following:
Expand Down Expand Up @@ -1066,7 +1102,7 @@ To configure the mapping, you define the set of all subscribable fields through
The configuration specifies that the field `fieldNameX` will contain the value extracted from the deserialized Kafka record through the `extractionExpressionX`, written using the [_Data Extraction Language_](#data-extraction-language). This approach makes it possible to transform a Kafka record of any complexity to the flat structure required by Lightstreamer.
The `QuickStart` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L352) shows a basic example, where a simple _direct_ mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the corresponding name. Of course, thanks to the _Data Extraction Language_, more complex mapping can be employed.
The `QuickStart` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L374) shows a basic example, where a simple _direct_ mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the corresponding name. Of course, thanks to the _Data Extraction Language_, more complex mapping can be employed.
```xml
...
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/allclasses-index.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>All Classes and Interfaces (kafka-connector 1.0.7 API)</title>
<title>All Classes and Interfaces (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="class index">
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/allpackages-index.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>All Packages (kafka-connector 1.0.7 API)</title>
<title>All Packages (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="package index">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>KafkaConnectorMetadataAdapter (kafka-connector 1.0.7 API)</title>
<title>KafkaConnectorMetadataAdapter (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="declaration: package: com.lightstreamer.kafka.adapters.pub, class: KafkaConnectorMetadataAdapter">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>com.lightstreamer.kafka.adapters.pub (kafka-connector 1.0.7 API)</title>
<title>com.lightstreamer.kafka.adapters.pub (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="declaration: package: com.lightstreamer.kafka.adapters.pub">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>com.lightstreamer.kafka.adapters.pub Class Hierarchy (kafka-connector 1.0.7 API)</title>
<title>com.lightstreamer.kafka.adapters.pub Class Hierarchy (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="tree: package: com.lightstreamer.kafka.adapters.pub">
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/help-doc.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>API Help (kafka-connector 1.0.7 API)</title>
<title>API Help (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="help">
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/index-all.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>Index (kafka-connector 1.0.7 API)</title>
<title>Index (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="index">
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>Overview (kafka-connector 1.0.7 API)</title>
<title>Overview (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="package index">
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/overview-summary.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>kafka-connector 1.0.7 API</title>
<title>kafka-connector 1.1.0 API</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="index redirect">
Expand Down
2 changes: 1 addition & 1 deletion docs/javadoc/overview-tree.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>Class Hierarchy (kafka-connector 1.0.7 API)</title>
<title>Class Hierarchy (kafka-connector 1.1.0 API)</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta name="description" content="class tree">
Expand Down
2 changes: 1 addition & 1 deletion examples/compose-templates/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ log4j.logger.com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter
log4j.logger.org.apache.kafka=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] [%-10c{1}] %-5p %m%n
log4j.appender.stdout.layout.ConversionPattern=[%d] [%-20t] [%-10c{1}] %-5p %m%n
log4j.appender.stdout.Target=System.out

# QuickStart logger
Expand Down
42 changes: 39 additions & 3 deletions examples/vendors/confluent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ To quickly complete the installation and verify the successful integration with

To enable a generic Lightstreamer client to receive real-time updates, it needs to subscribe to one or more items. Therefore, the Kafka Connector provides suitable mechanisms to map Kafka topics to Lightstreamer items effectively.

The `QuickStartConfluentCloud` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L419) comes with a straightforward mapping defined through the following settings:
The `QuickStartConfluentCloud` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L441) comes with a straightforward mapping defined through the following settings:

- An item template:
```xml
Expand Down Expand Up @@ -878,7 +878,7 @@ Example of configuration with the use of a ticket cache:
<param name="authentication.gssapi.ticket.cache.enable">true</param>
```

Check out the `QuickStartConfluentCloud` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L432) file, where you can find an example of an authentication configuration that uses SASL/PLAIN.
Check out the `QuickStartConfluentCloud` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L454) file, where you can find an example of an authentication configuration that uses SASL/PLAIN.

### Record Evaluation

Expand Down Expand Up @@ -922,6 +922,42 @@ Example:
<param name="record.consume.from">EARLIEST</param>
```

#### `record.consume.with.num.threads`

_Optional_. The number of threads to be used for concurrent processing of the incoming deserialized records. If set to `-1`, the number of threads will be automatically determined based on the number of available CPU cores.

Default value: `1`.

Example:

```xml
<param name="record.consume.with.num.threads">4</param>
```

#### `record.consume.with.order.strategy`

_Optional but only effective if [`record.consume.with.num.threads`](#recordconsumewithnumthreads) is set to a value greater than `1` (which includes hte default value)_. The order strategy to be used for concurrent processing of the incoming deserialized records. Can be one of the following:

- `ORDER_BY_PARTITION`: maintain the order of records within each partition.

If you have multiple partitions, records from different partitions can be processed concurrently by different threads, but the order of records from a single partition will always be preserved. This is the default and generally a good balance between performance and order.

- `ORDER_BY_KEY`: maintain the order among the records sharing the same key.

Different keys can be processed concurrently by different threads. So, while all records with key "A" are processed in order, and all records with key "B" are processed in order, the processing of "A" and "B" records can happen concurrently and interleaved in time. There's no guaranteed order between records of different keys.

- `UNORDERED`: provide no ordering guarantees.

Records from any partition and with any key can be processed by any thread at any time. This offers the highest throughput when an high number of subscriptions is involved, but the order in which records are delivered to Lightstreamer clients might not match the order they were written to Kafka. This is suitable for use cases where message order is not important.

Default value: `ORDER_BY_PARTITION`.

Example:

```xml
<param name="record.consume.with.order.strategyy">ORDER_BY_KEY</param>
```

#### `record.key.evaluator.type` and `record.value.evaluator.type`

_Optional_. The format to be used to deserialize respectively the key and value of a Kafka record. Can be one of the following:
Expand Down Expand Up @@ -1118,7 +1154,7 @@ To configure the mapping, you define the set of all subscribable fields through
The configuration specifies that the field `fieldNameX` will contain the value extracted from the deserialized Kafka record through the `extractionExpressionX`, written using the [_Data Extraction Language_](#data-extraction-language). This approach makes it possible to transform a Kafka record of any complexity to the flat structure required by Lightstreamer.
The `QuickStartConfluentCloud` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L448) shows a basic example, where a simple _direct_ mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the corresponding name. Of course, thanks to the _Data Extraction Language_, more complex mapping can be employed.
The `QuickStartConfluentCloud` [factory configuration](/kafka-connector-project/kafka-connector/src/adapter/dist/adapters.xml#L469) shows a basic example, where a simple _direct_ mapping has been defined between every attribute of the JSON record value and a Lightstreamer field with the corresponding name. Of course, thanks to the _Data Extraction Language_, more complex mapping can be employed.
```xml
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spotless {
greclipse()
}
java {
target 'src/main/**/*.java','src/test/**/*.java'
target 'src/main/**/*.java','src/test/**/*.java','src/jmh/**/*.java'
googleJavaFormat()
.aosp()
.reorderImports(true)
Expand Down
4 changes: 2 additions & 2 deletions kafka-connector-project/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version=1.0.7
version=1.1.0
brandedProjectName=Lightstreamer Kafka Connector
release_date=2024-06-11
release_date=UNRELEASED
connect_owner=lightstreamer
connect_componentName=kafka-connect-lightstreamer
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,28 @@
Default value: LATEST. -->
<param name="record.consume.from">EARLIEST</param>

<!-- Optional. The number of threads to be used for concurrent processing of the
incoming deserialized records. If set to `-1`, the number of threads will be automatically
determined based on the number of available CPU cores.
Default value: 1. -->
<!--
<param name="record.consume.with.num.threads">4</param>
-->

<!-- Optional but only effective if "record.consume.with.num.threads" is set to a value greater than 1 (which includes hte default value).
The order strategy to be used for concurrent processing of the incoming
deserialized records. Can be one of the following:
- ORDER_BY_PARTITION: maintain the order of records within each partition
- ORDER_BY_KEY: maintain the order among the records sharing the same key
- UNORDERED: provide no ordering guarantees
Default value: ORDER_BY_PARTITION. -->
<!--
<param name="record.consume.with.order.strategy">ORDER_BY_KEY</param>
-->

<!-- Optional. The format to be used to deserialize respectively the key and value of a Kafka record.
Can be one of the following:
- AVRO
Expand Down Expand Up @@ -464,6 +486,6 @@
<param name="field.offset">#{OFFSET}</param>
<param name="field.partition">#{PARTITION}</param>

</data_provider>
</data_provider>

</adapters_conf>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"kafka_connect_api": true
},
"logo": "assets/lightstreamer.png",
"documentation_url": "https://github.com/Lightstreamer/Lightstreamer-kafka-connector",
"documentation_url": "https://lightstreamer.com/confluent-docs",
"source_url" : "https://github.com/Lightstreamer/Lightstreamer-kafka-connector",
"docker_image" : {
"name" : "lightstreamer"
Expand Down
Loading

0 comments on commit ae978a6

Please sign in to comment.