Skip to content

Commit

Permalink
api changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwreeves committed Sep 6, 2023
1 parent 33ccf14 commit 22759a9
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 141 deletions.
58 changes: 49 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,72 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi

### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py)

`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.
It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.operators import FivetranOperator
```

`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.

`FivetranOperator` requires that you specify the `connector_id` of the Fivetran connector you wish to trigger. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

The `FivetranOperator` will wait for the sync to complete so long as `wait_for_completion=True` (this is the default). It is recommended that
you run in deferrable mode (this is also the default). If `wait_for_completion=False`, the operator will return the timestamp for the last sync.

Import into your DAG via:

### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py)

```python
from fivetran_provider_async.sensors import FivetranSensor
```

`FivetranSensor` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.


`FivetranSensor` requires that you specify the `connector_id` of the Fivetran connector you want to wait for. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors.

If used in this way,
`FivetranSensor` is most commonly useful in two scenarios:

`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).
1. Fivetran is using a separate scheduler than the Airflow scheduler.
2. You set `wait_for_completion=False` in the `FivetranOperator`, and you need to await the `FivetranOperator` task later. (You may want to do this if you want to arrange your DAG such that some tasks are dependent on _starting_ a sync and other tasks are dependent on _completing_ a sync).

If you are doing the 1st pattern, you may find it useful to set the `completed_after_time` to `data_interval_end`, or `data_interval_end` with some buffer:

Import into your DAG via:
```python
from fivetran_provider_async.sensors import FivetranSensor
fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_externally_scheduled_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)
```

If you are doing the 2nd pattern, you can use XComs to pass the target completed time to the sensor:

```python
fivetran_op = FivetranOperator(
task_id="fivetran_sync_my_db",
connector_id="bronzing_largely",
wait_for_completion=False,
)

fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_db_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_op >> fivetran_sensor
```

You may also specify the `FivetranSensor` without a `completed_after_time`.
In this case, the sensor will make note of when the last completed time was, and will wait for a new completed time.

Import into your DAG via:

## Examples

See the [**examples**](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/example_dags) directory for an example DAG.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
task_id="fivetran_async_sensor",
connector_id="bronzing_largely",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_async_op >> fivetran_sync_op >> fivetran_async_sensor
17 changes: 1 addition & 16 deletions fivetran_provider_async/example_dags/example_fivetran_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from airflow.providers.ssh.operators.ssh import SSHOperator

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -22,29 +21,15 @@
connector_id="{{ var.value.linkedin_connector_id }}",
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=600,
)

twitter_sync = FivetranOperator(
task_id="twitter-ads-sync",
connector_id="{{ var.value.twitter_connector_id }}",
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=600,
)

dbt_run = SSHOperator(
task_id="dbt_ad_reporting",
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting",
ssh_conn_id="dbtvm",
)

linkedin_sync >> linkedin_sensor
twitter_sync >> twitter_sensor
[linkedin_sensor, twitter_sensor] >> dbt_run
[linkedin_sync, twitter_sync] >> dbt_run
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
completed_after_time="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
)

fivetran_operator >> delay_task >> fivetran_sensor
Loading

0 comments on commit 22759a9

Please sign in to comment.