-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API Changes to FivetranSensor for supporting backfills #58
Conversation
1467486
to
3e4312e
Compare
3e4312e
to
22759a9
Compare
We have run these code changes in our company's Airflow deployment like this: FivetranSensor(
task_id=...,
connector_id=...,
completed_after_time="{{ data_interval_end + macros.timedelta(minutes=5) }}"
) And we confirmed it works as intended in these two cases:
|
…r-fivetran-async into api-changes
Will anyone be able to review this? @phanikumv @pankajastro @sunank200. I know these are big changes, but I think it's worth looking at them. Overall I think these changes really elevate the package to the next level, allowing folks to really customize their experience with Fivetran connection into Airflow. I've been using Airflow for multiple years for data engineering and am an OSS contributor to Airflow. I consider these changes to make for a more idiomatic experience, and also addresses many of the sort of real-world nuances that you want to have control over when designing data pipelines that can guarantee data integrity. Other reasons why I think this should get reviewed:
Let me know if there is anything you need from me to assist in reviewing the changes. Thank you for all your work on this! |
Sure @dwreeves we will review it.Thank you so much for your contributions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left a minor nitpick
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Co-authored-by: Wei Lee <[email protected]>
Thanks! I will change tonight. |
All requested changes made. |
Thanks @dwreeves , someone will review it tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few minor suggestions but overall it looks good to me. @dwreeves thanks for your contribution!
if not self.completed_after_time: | ||
self._completed_after_time_rendered = self.hook.get_last_sync(self.connector_id) | ||
else: | ||
self._completed_after_time_rendered = timezone.parse(self.completed_after_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick run on the old example but it failed here
Example: https://github.com/astronomer/airflow-provider-fivetran-async/blob/main/fivetran_provider_async/example_dags/example_fivetran_async.py#L33
Log
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 131, in _parse
dt = parser.parse(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/dateutil/parser/_parser.py", line 1368, in parse
return DEFAULTPARSER.parse(timestr, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/dateutil/parser/_parser.py", line 643, in parse
raise ParserError("Unknown string format: %s", timestr)
dateutil.parser._parser.ParserError: Unknown string format: None
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/airflow/airflow_provider_fivetran_async/fivetran_provider_async/sensors.py", line 149, in execute
elif not self.poke(context):
^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/airflow_provider_fivetran_async/fivetran_provider_async/sensors.py", line 200, in poke
self._completed_after_time_rendered = timezone.parse(self.completed_after_time)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/timezone.py", line 207, in parse
return pendulum.parse(string, tz=timezone or TIMEZONE, strict=strict) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/pendulum/parser.py", line 29, in parse
return _parse(text, **options)
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/pendulum/parser.py", line 45, in _parse
parsed = base_parse(text, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 74, in parse
return _normalize(_parse(text, **_options), **_options)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/pendulum/parsing/__init__.py", line 135, in _parse
raise ParserError("Invalid date string: {}".format(text))
pendulum.parsing.exceptions.ParserError: Invalid date string: None
[2023-10-17, 07:27:15 UTC] {taskinstance.py:1398} INFO - Marking task as FAILED. dag_id=example_fivetran_async, task_id=fivetran_async_sensor, execution_date=20231017T072549, start_date=20231017T072715, end_date=20231017T072715
[2023-10-17, 07:27:15 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 414 for task fivetran_async_sensor (Invalid date string: None; 324)
[2023-10-17, 07:27:15 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-10-17, 07:27:15 UTC] {taskinstance.py:2776} INFO - 0 downstream tasks scheduled from follow-on schedule check
Is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for delays, I'd had a very busy last few weeks.
I started a new job recently and don't have Airflow + Fivetran set up just yet (will be setting it up soon).
That said, I think I can see what happened here.
The error message shows the following:
Invalid date string: None
So it looks like what happened is the FivetranOperator
returned None
. Looking at the code, we see (simplified):
def execute(self, context: Context) -> None | str:
...
if not self.wait_for_completion:
return last_sync
if not self.deferrable:
self._wait_synchronously(pendulum.parse(last_sync)) # type: ignore[arg-type]
return None
else:
...
return None
So it only returns last_sync
(str) when when wait_for_completion
is False
. This makes sense, since in this case you'd be implementing a sort of async-await pattern with operator+sensor and the timestamp is used as the "future" so to speak.
The issue therefore is that the example DAG needs to show wait_for_completion=False
.
Reading these examples, I actually found them a little confusing.
I now wrote them a little bit differently to clarify the new behaviors:
example_fivetran.py
shows a sync example. It is unnecessary to use a sensor now by default, so I removed the sensor.example_fivetran_async.py
also does not need to use the sensor, since theFivetranOperator
awaits the Fivetran sync (both in sync and async moe).example_fivetran_xcom.py
neededwait_for_completion=False
to be set for that example to make any sense, which I didn't have.
That said, there are still some possible issues with the Bigquery and Bqml examples? Since they both uses the unnecessary FivetranSensor
. (It is unnecessary because the FivetranOperator
does the job of waiting). Maybe we remove the FivetranSensor
s from there too?
We can also potentially provide an example of when Fivetran is being externally scheduled and the {{ data_interval_end }}
is being used as the completed time?
Sorry all, I have been on vacation and essentially offline the past week. Will get to this soon! |
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Wei Lee <[email protected]>
Resolves #49
Changes
FivetranOperator
High level API:
execution_timeout
kwarg. This isn't even being used. It is pretty straightforward to remove this.deferrable=False
to wait for Fivetran sync to complete.deferrable
param is a parameter that defines the runtime of the operator (i.e. in normal worker nodes vs a triggerer instance), not the fundamental behavior of the operator. For that reason, thedeferrable=?
kwarg should not impact the behavior as dramatically as it currently does.deferrable=False
is significantly worse than waiting, as it leads to theFivetranSensor
being necessary to await theFivetranOperator
, whereas theFivetranSensor
would be more interesting if it were a choice rather than a strict necessity. I updated theREADME.md
to clarify what theFivetranSensor
can/should be used for going forward.wait_for_completion
kwarg. (I copied this kwarg name fromEcsRunTaskOperator
for whatever that is worth.) This combined withdeferrable
splits whatdeferrable
does into two separate options (waiting for Fivetran vs running on Triggerer instance) rather than having them combined into one option.FivetranSensor
I start by describing the changes to
FivetranOperator
because understanding these changes paves the way to understanding whyFivetranSensor
also changes.It appears right now that
FivetranSensor
is designed just to await aFivetranOperator(deferrable=False)
, out of necessity due to lack of functionality when non-deferrable.However, a much more relevant use case for the
FivetranSensor
is when Fivetran is running on its own schedule, and we want to await a sync scheduled outside of Airflow. The main motivation in designing these API changes is to support that use case. By makingFivetranOperator
always wait by default even whendeferrable=False
, thenFivetranSensor
's primary purpose can focus on more interesting use cases.High level API:
xcom
withcompleted_after_time
.xcom
is a misleading name. The value does not need to necessarily come fromxcom
. This kwarg was designed originally to be used in the case ofFivetranOperator >> FivetranSensor
, with the intent that the FivetranSensor was always awaiting a manually scheduled job via the completed at time passed through xcom.target_completed_time
or something similar to emulate theDateTimeSensor
API. However, this is not a very descriptive name, and it feels especially confusing when using the oldxcom_pull()
pattern (target_completed_time="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}"
looks strange and could be interpreted as "targetting" the last sync value. But in that case it would be unclear why this pattern even works!) I found this name to be a lot clearer.always_wait_when_syncing
kwarg. So the operator never passes while Fivetran is syncing when this is true.propagate_failures_forward
kwarg.target_completed_time > failed_at > succeeded_at
raises an error. By default this is False, matching the existing API.fail > success
indicates an issue that may eventually impact future syncs.(* Imperfect because for a long running DAG, or for concurrent task runs, Fivetran can just resync again.)
FivetranHook
High level API
is_synced_after_target_time()
andis_synced_after_target_time_async()
to replace andget_sync_status_async()
(which are deprecated).get_sync_status_async
returns a string butget_sync_status
returns boolean. Here the nameis_synced_after_target_time()
implies a boolean return value, which is what you get.connector_details
parsing logic so that it is in_determine_if_synced_from_connector_details()
. This means that both the sync and async operator use the same logic, and it gets to be DRY.Misc.
I did my best to keep the API in tact, which you can see by the fact that there are very few changes in
tests/
.The main test change I made was so that "succeeded_at > failed_at > completed_after_time" returns a success rather than a failure.