Skip to content

Commit

Permalink
Stateful fix (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheloveque authored Dec 7, 2023
1 parent a46f35f commit 2b7c563
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ def __init__(self, config: IBRedashSourceConfig, ctx: PipelineContext):
config,
ctx,
self.get_default_ingestion_job_id_prefix(),
self.get_last_checkpoint,
self.get_current_checkpoint,
self.state_provider.get_last_checkpoint,
self.state_provider.get_current_checkpoint,
)

self.config.connect_uri = self.config.connect_uri.strip("/")
self.client = self.create_redash_client(self.config.connect_uri, self.config.api_key)
if self.config.exp_api_key and self.config.exp_query_id:
Expand Down Expand Up @@ -133,7 +132,7 @@ def fetch_workunits(self) -> Iterable[WorkUnit]:
raise NotImplementedError("Sub-classes must implement this method.")

def get_workunits(self) -> Iterable[WorkUnit]:
if not self.is_stateful_ingestion_configured():
if not self.state_provider.is_stateful_ingestion_configured():
for wu in self.fetch_workunits():
self.report.report_workunit(wu)
yield wu
Expand Down Expand Up @@ -182,7 +181,7 @@ def get_workunits(self) -> Iterable[WorkUnit]:
self.state_manager.save_state()

def close(self):
self.prepare_for_commit()
self.state_provider.prepare_for_commit()
self.client.session.close()

def get_report(self) -> RedashSourceReport:
Expand Down

0 comments on commit 2b7c563

Please sign in to comment.