diff --git a/metadata-ingestion/src/datahub/ingestion/source/ib/ib_common.py b/metadata-ingestion/src/datahub/ingestion/source/ib/ib_common.py index dff5f5af65c83d..cebefd49cdf5ba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ib/ib_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ib/ib_common.py @@ -75,10 +75,9 @@ def __init__(self, config: IBRedashSourceConfig, ctx: PipelineContext): config, ctx, self.get_default_ingestion_job_id_prefix(), - self.get_last_checkpoint, - self.get_current_checkpoint, + self.state_provider.get_last_checkpoint, + self.state_provider.get_current_checkpoint, ) - self.config.connect_uri = self.config.connect_uri.strip("/") self.client = self.create_redash_client(self.config.connect_uri, self.config.api_key) if self.config.exp_api_key and self.config.exp_query_id: @@ -133,7 +132,7 @@ def fetch_workunits(self) -> Iterable[WorkUnit]: raise NotImplementedError("Sub-classes must implement this method.") def get_workunits(self) -> Iterable[WorkUnit]: - if not self.is_stateful_ingestion_configured(): + if not self.state_provider.is_stateful_ingestion_configured(): for wu in self.fetch_workunits(): self.report.report_workunit(wu) yield wu @@ -182,7 +181,7 @@ def get_workunits(self) -> Iterable[WorkUnit]: self.state_manager.save_state() def close(self): - self.prepare_for_commit() + self.state_provider.prepare_for_commit() self.client.session.close() def get_report(self) -> RedashSourceReport: