Skip to content

Commit

Permalink
Refreshing website content from main repo.
Browse files Browse the repository at this point in the history
  • Loading branch information
GitHub Action Website Snapshot committed Sep 9, 2024
1 parent b9ec334 commit e2f715d
Show file tree
Hide file tree
Showing 21 changed files with 1,495 additions and 458 deletions.
142 changes: 142 additions & 0 deletions docs/client/java/partials/java_transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,145 @@ OpenLineageClient client = OpenLineageClient.builder()

</TabItem>
</Tabs>

## [Composite](https://github.com/OpenLineage/OpenLineage/tree/main/client/java/src/main/java/io/openlineage/client/transports/CompositeTransport.java)

The `CompositeTransport` is designed to aggregate multiple transports, allowing event emission to several destinations sequentially. This is useful when events need to be sent to multiple targets, such as a logging system and an API endpoint, one after another in a defined order.

#### Configuration

- `type` - string, must be "composite". Required.
- `transports` - may be a list or a map of transport configurations. Required.
- `continueOnFailure` - boolean flag, determines if the process should continue even when one of the transports fails. Default is `false`.

#### Behavior

- The configured transports will be initialized and used in sequence to emit OpenLineage events.
- If `continueOnFailure` is set to `false`, a failure in one transport will stop the event emission process, and an exception will be raised.
- If `continueOnFailure` is `true`, the failure will be logged, but the remaining transports will still attempt to send the event.

#### Notes for Multiple Transports
This transport can include a variety of other transport types (e.g., `HttpTransport`, `KafkaTransport`, etc.), allowing flexibility in event distribution.
Ideal for scenarios where OpenLineage events need to reach multiple destinations for redundancy or different types of processing.

The `transports` configuration can be provided in two formats:

1. A list of transport configurations, where each transport may optionally include a name field.
2. A map of transport configurations, where the key acts as the name for each transport.
The map format is particularly useful for configurations set via environment variables or Java properties, providing a more convenient and flexible setup.

#### Examples

<Tabs groupId="integrations">
<TabItem value="yaml-list" label="Yaml Config (List)">

```yaml
transport:
type: composite
continueOnFailure: true
transports:
- type: http
url: http://example.com/api
name: my_http
- type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
continueOnFailure: true
```
</TabItem>
<TabItem value="yaml-map" label="Yaml Config (Map)">
```yaml
transport:
type: composite
continueOnFailure: true
transports:
my_http:
type: http
url: http://example.com/api
name: my_http
my_kafka:
type: kafka
topicName: openlineage.events
properties:
bootstrap.servers: localhost:9092,another.host:9092
acks: all
retries: 3
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
messageKey: some-value
continueOnFailure: true
```
</TabItem>
<TabItem value="spark" label="Spark Config">
```ini
spark.openlineage.transport.type=composite
spark.openlineage.transport.continueOnFailure=true
spark.openlineage.transport.transports.my_http.type=http
spark.openlineage.transport.transports.my_http.url=http://example.com/api
spark.openlineage.transport.transports.my_kafka.type=kafka
spark.openlineage.transport.transports.my_kafka.topicName=openlineage.events
spark.openlineage.transport.transports.my_kafka.properties.bootstrap.servers=localhost:9092,another.host:9092
spark.openlineage.transport.transports.my_kafka.properties.acks=all
spark.openlineage.transport.transports.my_kafka.properties.retries=3
spark.openlineage.transport.transports.my_kafka.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spark.openlineage.transport.transports.my_kafka.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
```

</TabItem>
<TabItem value="flink" label="Flink Config">

```ini
openlineage.transport.type=composite
openlineage.transport.continueOnFailure=true
openlineage.transport.transports.my_http.type=http
openlineage.transport.transports.my_http.url=http://example.com/api
openlineage.transport.transports.my_kafka.type=kafka
openlineage.transport.transports.my_kafka.topicName=openlineage.events
openlineage.transport.transports.my_kafka.properties.bootstrap.servers=localhost:9092,another.host:9092
openlineage.transport.transports.my_kafka.properties.acks=all
openlineage.transport.transports.my_kafka.properties.retries=3
openlineage.transport.transports.my_kafka.properties.key.serializer=org.apache.kafka.common.serialization.StringSerializer
openlineage.transport.transports.my_kafka.properties.value.serializer=org.apache.kafka.common.serialization.StringSerializer
```

</TabItem>
<TabItem value="java" label="Java Code">

```java
import java.util.Arrays;
import io.openlineage.client.OpenLineageClient;
import io.openlineage.client.transports.CompositeConfig;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.HttpTransport;
import io.openlineage.client.transports.KafkaConfig;
import io.openlineage.client.transports.KafkaTransport;

HttpConfig httpConfig = new HttpConfig();
httpConfig.setUrl("http://example.com/api");
KafkaConfig kafkaConfig = new KafkaConfig();
KafkaConfig.setTopicName("openlineage.events");
KafkaConfig.setLocalServerId("some-value");

CompositeConfig compositeConfig = new CompositeConfig(Arrays.asList(
new HttpTransport(httpConfig),
new KafkaTransport(kafkaConfig)
), true);

OpenLineageClient client = OpenLineageClient.builder()
.transport(
new CompositeTransport(compositeConfig))
.build();
```

</TabItem>
</Tabs>
Loading

0 comments on commit e2f715d

Please sign in to comment.