-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Includes: - Sequence diagram of airbyte protocol calls - Mapping between airbyte and transfer protocols - Known limitations - Example of configuration usage Fixes #149 --- Pull Request resolved: #153 commit_hash:aa813093e737d55ebc9ba2176cbfee515e3b480f
- Loading branch information
1 parent
29d0a07
commit f337412
Showing
1 changed file
with
121 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,62 +1,148 @@ | ||
## Airbyte provider | ||
|
||
This is a bridge between native transfer and airbyte connector. | ||
This adapter is ideal for scenarios where you need to synchronize data from an Airbyte-compatible source to a Transfer-compatible sink with minimal configuration. | ||
|
||
We support source airbyte [connectors](https://docs.airbyte.com/category/sources) | ||
|
||
### How to add new airbyte connector | ||
This adapter enables integration between [Airbyte](https://docs.airbyte.com/using-airbyte/core-concepts/) and [Transfer](https://github.com/doublecloud/transfer), facilitating the translation of Airbyte's core concepts into Transfer-compatible constructs for streamlined data movement and transformations. | ||
|
||
## Core Concepts Mapping | ||
|
||
The adapter maps Airbyte concepts to Transfer as described below: | ||
|
||
| Airbyte Concept | Transfer Equivalent | Description | | ||
|-----------------------|-------------------|-------------------------------------------------------------------------------------------------------------| | ||
| **Source** | **Source** | Airbyte sources are represented as Transfer sources, maintaining the configuration from Airbyte. | | ||
| **Destination** | **Sink** | Airbyte destinations are mapped to Transfer sinks, preserving the target configuration. | | ||
| **Stream** | **TableID** | Airbyte streams are directly translated into Transfer streams, including their metadata and schema. | | ||
| **Catalog** | **TableSchema** | Airbyte catalogs are used to define the schema of Transfer TableSchema. | | ||
| **State** | **Coordinator** | Airbyte's state mechanism is implemented in Transfer Coordinator for incremental updates and checkpointing. | | ||
|
||
## Features | ||
|
||
- **Comprehensive Integration**: Supports all Airbyte-compatible sources. | ||
- **Flexible Sync Modes**: Full-refresh and incremental sync modes are available, leveraging Airbyte’s capabilities. | ||
- **Streamlined State Management**: Automatic handling of state for incremental updates by using transfer Coordinator | ||
- **Schema Handling**: Seamless propagation of schema from Airbyte to Transfer. | ||
|
||
# Airbyte Adapter for Transfer | ||
|
||
This adapter enables integration between [Airbyte](https://docs.airbyte.com/using-airbyte/core-concepts/) and [Transfer](https://github.com/doublecloud/transfer), facilitating the translation of Airbyte's core concepts into Transfer-compatible constructs for streamlined data movement and transformations. | ||
|
||
## Configuration | ||
|
||
The Airbyte adapter supports only source configuration. To configure an Airbyte source, use the `spec` method from the [Airbyte Protocol](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#spec) to retrieve the schema for the required parameters. Then, create a configuration YAML file as follows: | ||
|
||
### Example Source Configuration | ||
|
||
```yaml | ||
type: airbyte | ||
params: | ||
Config: | | ||
{ | ||
"credentials":{ | ||
"auth_type":"username/password", | ||
"password":"AWESOME_PASSWORD", | ||
"username":"AWESOME_USER" | ||
}, | ||
"database":"AWESOME_DB", | ||
"host":"https://DB_ID.DB_REGION.snowflakecomputing.com", | ||
"role":"DB_ROLE", | ||
"schema":"DB_SCHEMA", | ||
"warehouse":"DB_WAREHOUSE" | ||
} | ||
BatchSizeLimit: 10485760 | ||
RecordsLimit: 10000 | ||
MaxRowSize: 268435456 | ||
Image: "airbyte/source-snowflake:0.1.32" | ||
``` | ||
#### 1. Add proto-model | ||
Save this configuration to a file and provide it to the Transfer system when setting up the Airbyte source. | ||
Each airbyte provider has own config. To get such config you can run: | ||
## Sequence Diagram | ||
```shell | ||
docker run airbyte/source-snowflake:0.1.31 spec | ||
``` | ||
The following sequence diagram illustrates the interaction between Transfer and Airbyte based on the [Airbyte Protocol](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol): | ||
This would output airbyte [spec](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#spec) for a connector. | ||
Output may contains also logs, but some line should have type `SPEC`. Something like this: | ||
```mermaid | ||
sequenceDiagram | ||
participant ActivateFlow | ||
participant TransferAdapter | ||
participant AirbytDocker | ||
participant Sink | ||
|
||
```shell | ||
{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.b.IntegrationRunner(runInternal):107 Integration config: IntegrationConfig{command=SPEC, configPath='null', catalogPath='null', statePath='null'}"}} | ||
{"type":"SPEC","spec":{"documentationUrl":"https://docs.airbyte.com/integrations/sources/snowflake","connectionSpecification":{"$schema":"http://json-schema.org/draft-07/schema#","title":"Snowflake Source Spec","type":"object","required":["host","role","warehouse","database"],"properties":{"credentials":{"title":"Authorization Method","type":"object","oneOf":[{"type":"object","title":"OAuth2.0","order":0,"required":["client_id","client_secret","auth_type"],"properties":{"auth_type":{"type":"string","const":"OAuth","order":0},"client_id":{"type":"string","title":"Client ID","description":"The Client ID of your Snowflake developer application.","airbyte_secret":true,"order":1},"client_secret":{"type":"string","title":"Client Secret","description":"The Client Secret of your Snowflake developer application.","airbyte_secret":true,"order":2},"access_token":{"type":"string","title":"Access Token","description":"Access Token for making authenticated requests.","airbyte_secret":true,"order":3},"refresh_token":{"type":"string","title":"Refresh Token","description":"Refresh Token for making authenticated requests.","airbyte_secret":true,"order":4}}},{"title":"Username and Password","type":"object","required":["username","password","auth_type"],"order":1,"properties":{"auth_type":{"type":"string","const":"username/password","order":0},"username":{"description":"The username you created to allow Airbyte to access the database.","examples":["AIRBYTE_USER"],"type":"string","title":"Username","order":1},"password":{"description":"The password associated with the username.","type":"string","airbyte_secret":true,"title":"Password","order":2}}}],"order":0},"host":{"description":"The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com).","examples":["accountname.us-east-2.aws.snowflakecomputing.com"],"type":"string","title":"Account Name","order":1},"role":{"description":"The role you created for Airbyte to access Snowflake.","examples":["AIRBYTE_ROLE"],"type":"string","title":"Role","order":2},"warehouse":{"description":"The warehouse you created for Airbyte to access data.","examples":["AIRBYTE_WAREHOUSE"],"type":"string","title":"Warehouse","order":3},"database":{"description":"The database you created for Airbyte to access data.","examples":["AIRBYTE_DATABASE"],"type":"string","title":"Database","order":4},"schema":{"description":"The source Snowflake schema tables. Leave empty to access tables from multiple schemas.","examples":["AIRBYTE_SCHEMA"],"type":"string","title":"Schema","order":5},"jdbc_url_params":{"description":"Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).","title":"JDBC URL Params","type":"string","order":6}}},"supportsNormalization":false,"supportsDBT":false,"supported_destination_sync_modes":[],"advanced_auth":{"auth_flow_type":"oauth2.0","predicate_key":["credentials","auth_type"],"predicate_value":"OAuth","oauth_config_specification":{"oauth_user_input_from_connector_config_specification":{"type":"object","properties":{"host":{"type":"string","path_in_connector_config":["host"]},"role":{"type":"string","path_in_connector_config":["role"]}}},"complete_oauth_output_specification":{"type":"object","properties":{"access_token":{"type":"string","path_in_connector_config":["credentials","access_token"]},"refresh_token":{"type":"string","path_in_connector_config":["credentials","refresh_token"]}}},"complete_oauth_server_input_specification":{"type":"object","properties":{"client_id":{"type":"string"},"client_secret":{"type":"string"}}},"complete_oauth_server_output_specification":{"type":"object","properties":{"client_id":{"type":"string","path_in_connector_config":["credentials","client_id"]},"client_secret":{"type":"string","path_in_connector_config":["credentials","client_secret"]}}}}}}} | ||
{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.b.IntegrationRunner(runInternal):182 Completed integration: io.airbyte.integrations.source.snowflake.SnowflakeSource"}} | ||
``` | ||
ActivateFlow->>TransferAdapter: TableList | ||
|
||
Spec line contains `connectionSpecification` key with JSON schema model, this model can be a base for our proto model. | ||
TransferAdapter->>AirbyteDocker: Configure Source | ||
AirbyteDocker-->>TransferAdapter: Validate Config | ||
|
||
This specification can be baseline for new proto, for example we can use some semi-automatic [tools](https://transform.tools/json-schema-to-protobuf) to convert json-schema into proto. | ||
TransferAdapter->>AirbyteDocker: Discover Schema | ||
AirbyteDocker-->>TransferAdapter: Return Schema | ||
TransferAdapter->>ActivateFlow: TableList(TableMap) | ||
|
||
As example you can refer to exist [protos](https://a.yandex-team.ru/arcadia/transfer_manager/go/proto/api/endpoint/airbyte/?rev=r10953642) | ||
loop Every Stream | ||
ActivateFlow->>TransferAdapter: LoadTable | ||
TransferAdapter->>AirbyteDocker: Open Stream | ||
|
||
#### 2. Adding docker image and provider type | ||
loop Until Stream Has Data | ||
AirbyteDocker-->>TransferAdapter: Data Chunk | ||
TransferAdapter->>TransferAdapter: Parse Data | ||
TransferAdapter-->>Sink: Push | ||
alt Chunk has State | ||
TransferAdapter->>AirbyteDocker: Acknowledge State | ||
AirbyteDocker-->>TransferAdapter: Confirm State Update | ||
end | ||
end | ||
|
||
Each user facing provider has own value of [API-Providers enum](https://a.yandex-team.ru/arcadia/transfer_manager/go/proto/api/endpoint.proto?rev=r10968706#L74). If there is no provider in enum - you need to add new one. | ||
TransferAdapter->>ActivateFlow: LoadTable Done | ||
|
||
Beside that this enum should be added to [Known Airbyte Providers](https://a.yandex-team.ru/arcadia/transfer_manager/go/pkg/providers/airbyte/provider_model.go?rev=r11073103#L29) list. | ||
end | ||
TransferAdapter->>ActivateFlow: Activate Completed | ||
ActivateFlow-->>ActivateFlow: Store Table Progress | ||
``` | ||
To create linkage between API-enum into specific Airbyte provider-code we need to add mappings: | ||
### Steps Explained | ||
1. Between Provider Enum and proto-model [here](https://a.yandex-team.ru/arcadia/transfer_manager/go/pkg/providers/airbyte/provider_model.go?rev=r11073103#L303) | ||
1. Between Provider Enum and Airbyte-docker image [here](https://a.yandex-team.ru/arcadia/transfer_manager/go/pkg/providers/airbyte/provider_model.go?rev=r11073103#L282) | ||
1. **Table List Retrieval**: | ||
- `ActivateFlow` requests the list of tables by sending a `TableList` command to the `TransferAdapter`. | ||
|
||
By default, we map proto message into json message as is with standard proto-json mapper, but for some cases (for example one-of fields) we should add extra mapping code like we do [here](https://a.yandex-team.ru/arcadia/transfer_manager/go/pkg/providers/airbyte/provider_model.go?rev=r11073103#L351). | ||
2. **Source Configuration**: | ||
- `TransferAdapter` sends a `Configure Source` command to `AirbyteDocker` to configure the source. | ||
- `AirbyteDocker` validates the configuration and responds with the result. | ||
|
||
#### 3. Enable new provider | ||
3. **Schema Discovery**: | ||
- `TransferAdapter` invokes the `Discover Schema` command on `AirbyteDocker` to fetch the schema for the configured source. | ||
- `AirbyteDocker` returns the discovered schema to `TransferAdapter`. | ||
- `TransferAdapter` maps the schema to tables and sends it back to `ActivateFlow` as `TableList(TableMap)`. | ||
|
||
For managing providers across installation we use [grants](https://a.yandex-team.ru/arcadia/transfer_manager/go/pkg/grants/?rev=r11099251) mechanism, so as last step we should add the new airbyte grant like we do [here](https://a.yandex-team.ru/arcadia/transfer_manager/go/pkg/grants/grants.go?rev=r11099251#L67). | ||
4. **Table Loading**: | ||
- For each stream in the schema, the following steps are executed in a loop: | ||
|
||
In an initial phase its recommended to enable the new provider in preview mode until a certain stability can be proven. | ||
``` | ||
NewAirbyteGrant(api.EndpointType_SNOWFLAKE, "Snowflake", | ||
AWS().Preview(), | ||
) | ||
``` | ||
a. **Load Table Request**: | ||
- `ActivateFlow` sends a `LoadTable` command to `TransferAdapter` to initiate loading for a specific table. | ||
|
||
b. **Stream Opening**: | ||
- `TransferAdapter` requests `AirbyteDocker` to `Open Stream` for the corresponding table. | ||
|
||
c. **Data Stream Processing**: | ||
- In a loop that runs until the stream provides data: | ||
- `TransferAdapter` parses the data chunk and pushes the parsed data to the `Sink`. | ||
- If the data chunk contains state information: | ||
- `TransferAdapter` acknowledges the state by sending `Acknowledge State` to `AirbyteDocker`. | ||
|
||
d. **Table Load Completion**: | ||
- Once all data for the stream has been processed, `TransferAdapter` notifies `ActivateFlow` with `LoadTable Done`. | ||
|
||
#### 4. Optional e2e test | ||
5. **Activation Completion**: | ||
- After all streams have been processed: | ||
- `TransferAdapter` sends an `Activate Completed` message to `ActivateFlow`. | ||
- `ActivateFlow` stores progress for all tables using `Store Table Progress`. | ||
|
||
We have a fat pre-commit test for the airbyte connectors which is run by a [special script `go/tests/e2e/run_teamcity_docker_tests.sh`](../../../tests/e2e/run_teamcity_docker_tests.sh), so if you have a stable instance that can be used as CI-source you can add a new test like it's done [in `s3csv`](../../../tests/e2e/airbyte2ch/s3csv/check_db_test.go) or another test in that directory. | ||
## Known Limitations | ||
|
||
#### Example PR adding Airbyte Snowflake connector | ||
- **Performance**: Data export performance is limited by Airbyte itself and typically does not exceed 10 MB/s per table. | ||
- **Parallelism**: | ||
- Parallelism is supported at the table level, allowing multiple tables to be processed concurrently. | ||
- However, within a single table, there is no internal parallelism, which may limit performance for large tables. | ||
- **Preconfigured Connectors**: Requires Airbyte connectors to be set up and preconfigured before use. | ||
|
||
For a full example adding a new airbyte connectory you can have a look at this [PR](https://a.yandex-team.ru/review/3653374/files/1) |