From 22125e09564c4e4273d906a8a963437360a137c0 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Mon, 26 Aug 2024 18:19:10 +0530 Subject: [PATCH] fix(mode/ingestion): add connection timeouts to avoid RemoteDisconnected errors --- .../src/datahub/ingestion/source/mode.py | 17 ++++++++++++++--- .../tests/integration/mode/test_mode.py | 8 +++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 47475c5825a493..f50b1b9fd8e75b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from functools import lru_cache +from http.client import RemoteDisconnected from typing import Dict, Iterable, List, Optional, Set, Tuple, Union import dateutil.parser as dp @@ -18,6 +19,7 @@ from requests.models import HTTPBasicAuth, HTTPError from sqllineage.runner import LineageRunner from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential +from urllib3.exceptions import ProtocolError import datahub.emitter.mce_builder as builder from datahub.configuration.common import AllowDenyPattern, ConfigModel @@ -127,6 +129,10 @@ class ModeAPIConfig(ConfigModel): max_attempts: int = Field( default=5, description="Maximum number of attempts to retry before failing" ) + timeout: int = Field( + default=40, + description="Timout setting, how long to wait for the Mode rest api to send data before giving up", + ) class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase): @@ -303,7 +309,7 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig): self.report = ModeSourceReport() self.ctx = ctx - self.session = requests.session() + self.session = requests.Session() self.session.auth = HTTPBasicAuth( self.config.token, self.config.password.get_secret_value(), @@ -1416,9 +1422,14 @@ def _get_request_json(self, url: str) -> Dict: @r.wraps def get_request(): try: - response = self.session.get(url) - response.raise_for_status() + response = self.session.get( + url, timeout=self.config.api_options.timeout + ) return response.json() + except (RemoteDisconnected, ProtocolError) as e: + # TODO Need to check why server is closes the connection before sending a complete response + logging.warning(f"RemoteDisconnected or ProtocolError: {str(e)}") + return {} except HTTPError as http_error: error_response = http_error.response if error_response.status_code == 429: diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index 0346767b05d253..e21619c01cb474 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -37,12 +37,14 @@ def __init__(self, error_list, status_code): self.auth = None self.headers = {} self.url = None + self.timeout = None def json(self): return self.json_data - def get(self, url): + def get(self, url, timeout=40): self.url = url + self.timeout = timeout response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}" with open(response_json_path) as file: data = json.loads(file.read()) @@ -70,7 +72,7 @@ def mocked_requests_failure(*args, **kwargs): @freeze_time(FROZEN_TIME) def test_mode_ingest_success(pytestconfig, tmp_path): with patch( - "datahub.ingestion.source.mode.requests.session", + "datahub.ingestion.source.mode.requests.Session", side_effect=mocked_requests_sucess, ): pipeline = Pipeline.create( @@ -107,7 +109,7 @@ def test_mode_ingest_success(pytestconfig, tmp_path): @freeze_time(FROZEN_TIME) def test_mode_ingest_failure(pytestconfig, tmp_path): with patch( - "datahub.ingestion.source.mode.requests.session", + "datahub.ingestion.source.mode.requests.Session", side_effect=mocked_requests_failure, ): global test_resources_dir