Skip to content

Commit

Permalink
Add Client Side Error Handing section (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfinocchiaro authored Dec 11, 2024
1 parent ae978a6 commit 01c42b1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
31 changes: 27 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ _Last-mile data streaming. Stream real-time Kafka data to mobile and web apps, a
- [Basic HTTP Authenticaion Parameters](#basic-http-authentication-parameters)
- [Encryption Parameters](#encryption-parameters-1)
- [Quick Start Schema Registry Example](#quick-start-schema-registry-example)
- [Client Side Error Handling](#client-side-error-handling)
- [Customize the Kafka Connector Metadata Adapter Class](#customize-the-kafka-connector-metadata-adapter-class)
- [Develop the Extension](#develop-the-extension)
- [Kafka Lightstreamer Sink Connector](#kafka-connect-lightstreamer-sink-connector)
Expand Down Expand Up @@ -961,10 +962,10 @@ Examples:

#### `record.extraction.error.strategy`

_Optional_. The error handling strategy to be used if an error occurs while [extracting data](#record-mapping-fieldfieldname) from incoming deserialized records. Can be one of the following:
_Optional_. The error handling strategy to be used if an error occurs while [extracting data](#data-extraction-language) from incoming deserialized records. Can be one of the following:

- `IGNORE_AND_CONTINUE`: ignore the error and continue to process the next record
- `FORCE_UNSUBSCRIPTION`: stop processing records and force unsubscription of the items requested by all the clients subscribed to this connection
- `FORCE_UNSUBSCRIPTION`: stop processing records and force unsubscription of the items requested by all the clients subscribed to this connection (see the [Client Side Error Handling](#client-side-error-handling) section)

Default value: `IGNORE_AND_CONTINUE`.

Expand Down Expand Up @@ -1191,8 +1192,8 @@ which specifies how to route records published from the topic `user` to the item
Let's suppose we have three different Lightstreamer clients:

1. _Client A_ subscribes to the following parameterized items:
- _SA1_ `user-[firstName=James,lastName=Kirk]` for receiving real-time updates relative to the user `James Kirk`
- _SA2_ `user-[age=45]` for receiving real-time updates relative to any 45 year-old user
- _SA1_ `user-[firstName=James,lastName=Kirk]` for receiving real-time updates relative to the user `James Kirk`.
- _SA2_ `user-[age=45]` for receiving real-time updates relative to any 45 year-old user.
2. _Client B_ subscribes to the parameterized item _SB1_ `user-[firstName=Montgomery,lastName=Scotty]` for receiving real-time updates relative to the user `Montgomery Scotty`.
3. _Client C_ subscribes to the parameterized item _SC1_ `user-[age=37]` for receiving real-time updates relative to any 37 year-old user.

Expand Down Expand Up @@ -1345,6 +1346,28 @@ Example:
Check out the [adapters.xml](/examples/quickstart-schema-registry/adapters.xml#L58) file of the [_Quick Start Schema Registry_](/examples/quickstart-schema-registry/) app, where you can find an example of Schema Registry settings.
# Client Side Error Handling
When a client sends a subscription to the Kafka Connector, several error conditions can occur:
- Connection issues: the Kafka broker may be unreachable due to network problems or an incorrect configuration of the [`bootstrap.servers`](#bootstrapservers) parameter.
- Non-existent topics: none of the Kafka topics mapped in the [record routing](#record-routing-maptopicto) configurations exist in the broker.
- Data extraction: issues may arise while [extracting data](#data-extraction-language) from incoming records and the [`record.extraction.error.strategy`](#recordextractionerrorstrategy) parameter is set to `FORCE_UNSUBSCRIPTION`.
In these scenarios, the Kafka Connector triggers the unsubscription from all the items that were subscribed to the [target connection](#data_providername---kafka-connection-name). A client can be notified about the unsubscription event by implementing the `onUnsubscription` event handler, as shown in the following Java code snippet:
```java
subscription.addSubscriptionListener(new SubscriptionListener() {
...
public void onUnsubscription() {
// Manage the unscription event.
}
...
});
```
# Customize the Kafka Connector Metadata Adapter Class
If you have any specific need to customize the _Kafka Connector Metadata Adapter_ class (e.g, for implementing custom authentication and authorization logic), you can provide your implementation by extending the factory class [`com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter`](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka/adapters/pub/KafkaConnectorMetadataAdapter.html). The class provides the following hook methods, which you can override to add your custom logic:
Expand Down
31 changes: 27 additions & 4 deletions examples/vendors/confluent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ _Last-mile data streaming. Stream real-time Kafka data to mobile and web apps, a
- [Basic HTTP Authenticaion Parameters](#basic-http-authentication-parameters)
- [Encryption Parameters](#encryption-parameters-1)
- [Quick Start Schema Registry Example](#quick-start-schema-registry-example)
- [Client Side Error Handling](#client-side-error-handling)
- [Customize the Kafka Connector Metadata Adapter Class](#customize-the-kafka-connector-metadata-adapter-class)
- [Develop the Extension](#develop-the-extension)
- [Kafka Lightstreamer Sink Connector](#kafka-connect-lightstreamer-sink-connector)
Expand Down Expand Up @@ -1013,10 +1014,10 @@ Examples:

#### `record.extraction.error.strategy`

_Optional_. The error handling strategy to be used if an error occurs while [extracting data](#record-mapping-fieldfieldname) from incoming deserialized records. Can be one of the following:
_Optional_. The error handling strategy to be used if an error occurs while [extracting data](#data-extraction-language) from incoming deserialized records. Can be one of the following:

- `IGNORE_AND_CONTINUE`: ignore the error and continue to process the next record
- `FORCE_UNSUBSCRIPTION`: stop processing records and force unsubscription of the items requested by all the clients subscribed to this connection
- `FORCE_UNSUBSCRIPTION`: stop processing records and force unsubscription of the items requested by all the clients subscribed to this connection (see the [Client Side Error Handling](#client-side-error-handling) section)

Default value: `IGNORE_AND_CONTINUE`.

Expand Down Expand Up @@ -1243,8 +1244,8 @@ which specifies how to route records published from the topic `user` to the item
Let's suppose we have three different Lightstreamer clients:

1. _Client A_ subscribes to the following parameterized items:
- _SA1_ `user-[firstName=James,lastName=Kirk]` for receiving real-time updates relative to the user `James Kirk`
- _SA2_ `user-[age=45]` for receiving real-time updates relative to any 45 year-old user
- _SA1_ `user-[firstName=James,lastName=Kirk]` for receiving real-time updates relative to the user `James Kirk`.
- _SA2_ `user-[age=45]` for receiving real-time updates relative to any 45 year-old user.
2. _Client B_ subscribes to the parameterized item _SB1_ `user-[firstName=Montgomery,lastName=Scotty]` for receiving real-time updates relative to the user `Montgomery Scotty`.
3. _Client C_ subscribes to the parameterized item _SC1_ `user-[age=37]` for receiving real-time updates relative to any 37 year-old user.

Expand Down Expand Up @@ -1397,6 +1398,28 @@ Example:
Check out the [adapters.xml](/examples/quickstart-schema-registry/adapters.xml#L58) file of the [_Quick Start Schema Registry_](/examples/quickstart-schema-registry/) app, where you can find an example of Schema Registry settings.
# Client Side Error Handling
When a client sends a subscription to the Kafka Connector, several error conditions can occur:
- Connection issues: the Kafka broker may be unreachable due to network problems or an incorrect configuration of the [`bootstrap.servers`](#bootstrapservers) parameter.
- Non-existent topics: none of the Kafka topics mapped in the [record routing](#record-routing-maptopicto) configurations exist in the broker.
- Data extraction: issues may arise while [extracting data](#data-extraction-language) from incoming records and the [`record.extraction.error.strategy`](#recordextractionerrorstrategy) parameter is set to `FORCE_UNSUBSCRIPTION`.
In these scenarios, the Kafka Connector triggers the unsubscription from all the items that were subscribed to the [target connection](#data_providername---kafka-connection-name). A client can be notified about the unsubscription event by implementing the `onUnsubscription` event handler, as shown in the following Java code snippet:
```java
subscription.addSubscriptionListener(new SubscriptionListener() {
...
public void onUnsubscription() {
// Manage the unscription event.
}
...
});
```
# Customize the Kafka Connector Metadata Adapter Class
If you have any specific need to customize the _Kafka Connector Metadata Adapter_ class (e.g, for implementing custom authentication and authorization logic), you can provide your implementation by extending the factory class [`com.lightstreamer.kafka.adapters.pub.KafkaConnectorMetadataAdapter`](https://lightstreamer.github.io/Lightstreamer-kafka-connector/javadoc/com/lightstreamer/kafka/adapters/pub/KafkaConnectorMetadataAdapter.html). The class provides the following hook methods, which you can override to add your custom logic:
Expand Down

0 comments on commit 01c42b1

Please sign in to comment.