Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Changes to FivetranSensor for supporting backfills #58

Merged
merged 10 commits into from
Nov 6, 2023
58 changes: 49 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,72 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi

### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py)

`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.
It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.operators import FivetranOperator
```

`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.

`FivetranOperator` requires that you specify the `connector_id` of the Fivetran connector you wish to trigger. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

The `FivetranOperator` will wait for the sync to complete so long as `wait_for_completion=True` (this is the default). It is recommended that
you run in deferrable mode (this is also the default). If `wait_for_completion=False`, the operator will return the timestamp for the last sync.

Import into your DAG via:

### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py)

```python
from fivetran_provider_async.sensors import FivetranSensor
```

`FivetranSensor` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.


`FivetranSensor` requires that you specify the `connector_id` of the Fivetran connector you want to wait for. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors.

If used in this way,
`FivetranSensor` is most commonly useful in two scenarios:

`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).
1. Fivetran is using a separate scheduler than the Airflow scheduler.
2. You set `wait_for_completion=False` in the `FivetranOperator`, and you need to await the `FivetranOperator` task later. (You may want to do this if you want to arrange your DAG such that some tasks are dependent on _starting_ a sync and other tasks are dependent on _completing_ a sync).

If you are doing the 1st pattern, you may find it useful to set the `completed_after_time` to `data_interval_end`, or `data_interval_end` with some buffer:

Import into your DAG via:
```python
from fivetran_provider_async.sensors import FivetranSensor
fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_externally_scheduled_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)
```

If you are doing the 2nd pattern, you can use XComs to pass the target completed time to the sensor:

```python
fivetran_op = FivetranOperator(
task_id="fivetran_sync_my_db",
connector_id="bronzing_largely",
wait_for_completion=False,
)

fivetran_sensor = FivetranSensor(
task_id="wait_for_fivetran_db_sync",
connector_id="bronzing_largely",
poke_interval=5,
completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_op >> fivetran_sensor
```

You may also specify the `FivetranSensor` without a `completed_after_time`.
In this case, the sensor will make note of when the last completed time was, and will wait for a new completed time.

Import into your DAG via:

## Examples

See the [**examples**](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/example_dags) directory for an example DAG.
Expand Down
13 changes: 1 addition & 12 deletions fivetran_provider_async/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,18 +18,8 @@

with dag:
fivetran_sync_start = FivetranOperator(
task_id="fivetran-task",
task_id="fivetran_task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
deferrable=False,
)

fivetran_sync_start >> fivetran_sync_wait
21 changes: 9 additions & 12 deletions fivetran_provider_async/example_dags/example_fivetran_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,22 +18,20 @@
)

with dag:
# Both of these tasks will start a Fivetran sync,
# and will wait for the Fivetran sync to complete before marking
# the task as success.

# However, the async operator uses the triggerer instance to do this,
# which frees up a worker slot.

fivetran_async_op = FivetranOperator(
task_id="fivetran_async_op",
connector_id="bronzing_largely",
)

fivetran_sync_op = FivetranOperator(
task_id="fivetran_sync_op",
connector_id="bronzing_largely",
deferrable=False,
)

fivetran_async_sensor = FivetranSensor(
task_id="fivetran_async_sensor",
connector_id="bronzing_largely",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
task_id="fivetran_sync_op", connector_id="bronzing_largely", deferrable=False
)

fivetran_async_op >> fivetran_sync_op >> fivetran_async_sensor
fivetran_async_op >> fivetran_sync_op
17 changes: 1 addition & 16 deletions fivetran_provider_async/example_dags/example_fivetran_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from airflow.providers.ssh.operators.ssh import SSHOperator

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -22,29 +21,15 @@
connector_id="{{ var.value.linkedin_connector_id }}",
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=600,
)

twitter_sync = FivetranOperator(
task_id="twitter-ads-sync",
connector_id="{{ var.value.twitter_connector_id }}",
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=600,
)

dbt_run = SSHOperator(
task_id="dbt_ad_reporting",
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting",
ssh_conn_id="dbtvm",
)

linkedin_sync >> linkedin_sensor
twitter_sync >> twitter_sensor
[linkedin_sensor, twitter_sensor] >> dbt_run
[linkedin_sync, twitter_sync] >> dbt_run
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
task_id="fivetran-operator",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
wait_for_completion=False,
)

delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60))
Expand All @@ -34,7 +35,7 @@
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
completed_after_time="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
)

fivetran_operator >> delay_task >> fivetran_sensor
Loading