diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/schemas/mgramseva_water_connections.json b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/schemas/mgramseva_water_connections.json new file mode 100644 index 000000000000..a7af717984f2 --- /dev/null +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/schemas/mgramseva_water_connections.json @@ -0,0 +1,12 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "data": { + "type": "object" + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py index 25c9f736ecd7..e559a9f53d88 100644 --- a/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py +++ b/airbyte-integrations/connectors/source-mgramseva/source_mgramseva/source.py @@ -298,6 +298,35 @@ def read_records( yield from paymentstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) +class MgramsevaWaterConnections(MgramsevaStream): + """object for water connections""" + + def __init__( + self, headers: dict, request_info: dict, user_request: dict, tenantid_list: list, **kwargs + ): # pylint: disable=super-init-not-called + """specify endpoint for water connections and call super""" + self.headers = headers + self.request_info = request_info + self.user_request = user_request + self.tenantid_list = tenantid_list + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: Optional[List[str]] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + stream_state: Optional[Mapping[str, Any]] = None, + ) -> Iterable[StreamData]: + """override""" + + for tenantid in self.tenantid_list: + params = {"tenantId": tenantid, "businessService": "WS"} + wcstream = MgramsevaStream( + "ws-services/wc/_search", self.headers, self.request_info, self.user_request, params, "WaterConnection" + ) + yield from wcstream.read_records(sync_mode, cursor_field, stream_slice, stream_state) + + # Source class SourceMgramseva(AbstractSource): """Source for mGramSeva""" @@ -393,6 +422,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # Generate streams for each object type streams = [ MgramsevaPayments(self.headers, self.request_info, self.user_request, self.config["tenantids"]), + MgramsevaWaterConnections(self.headers, self.request_info, self.user_request, self.config["tenantids"]), MgramsevaTenantExpenses(self.headers, self.request_info, self.user_request, self.config["tenantids"], start_date, end_date), MgramsevaDemands(self.headers, self.request_info, self.user_request, self.config["tenantids"]), ]