From cb23a93f8974cd49d31fff8edcb315b62acbfc72 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 19 Sep 2024 19:24:58 -0700 Subject: [PATCH] Implemented the HTTP-based push mode igest in the integration test --- .../python/lsst/qserv/admin/cli/script.py | 4 + .../lsst/qserv/admin/replicationInterface.py | 157 +++++++++++++----- 2 files changed, 122 insertions(+), 39 deletions(-) diff --git a/src/admin/python/lsst/qserv/admin/cli/script.py b/src/admin/python/lsst/qserv/admin/cli/script.py index f68865c61..0ddd17ad3 100644 --- a/src/admin/python/lsst/qserv/admin/cli/script.py +++ b/src/admin/python/lsst/qserv/admin/cli/script.py @@ -1027,8 +1027,12 @@ def load_simple(repl_ctrl_uri: str, auth_key: str, load_http: bool) -> None: chunk_location = repl.ingest_chunk_config(transaction_id, "0") repl.ingest_data_file( transaction_id, + "0", + False, chunk_location.host, chunk_location.port, + chunk_location.http_host, + chunk_location.http_port, data_file=data_file, table=table, load_http=load_http, diff --git a/src/admin/python/lsst/qserv/admin/replicationInterface.py b/src/admin/python/lsst/qserv/admin/replicationInterface.py index 40843ab51..8004d74c4 100644 --- a/src/admin/python/lsst/qserv/admin/replicationInterface.py +++ b/src/admin/python/lsst/qserv/admin/replicationInterface.py @@ -27,6 +27,7 @@ import os from requests import delete, get, post, put from requests.exceptions import ConnectionError +from requests_toolbelt.multipart.encoder import MultipartEncoder import subprocess from .itest_table import LoadTable from typing import Any, Callable, Dict, Generator, List, Optional, NamedTuple, Tuple @@ -47,11 +48,15 @@ class ChunkLocation(NamedTuple): chunk_id: str host: str port: str + http_host: str + http_port: str class RegularLocation(NamedTuple): host: str port: str + http_host: str + http_port: str def _check(result: Dict[Any, Any], url: str) -> None: @@ -94,6 +99,30 @@ def _post(url: str, data: str) -> Dict[Any, Any]: _check(res, url) return res +@backoff.on_exception( + exception=ConnectionError, + wait_gen=backoff.expo, + on_backoff=on_backoff(log=_log), + max_time=max_backoff_sec, +) +def _post_file_upload(url: str, encoder: MultipartEncoder) -> Dict[Any, Any]: + """Call requests.post and check the result for success=1. + + Parameters + ---------- + url : `str` + The url to send to `post`. + data : `data` + The data to send to `post`. + + Returns + ------- + result : `dict` + The dict containing the result of calling `post`. + """ + res: Dict[Any, Any] = post(url, data=encoder, headers={'Content-Type': encoder.content_type}).json() + _check(res, url) + return res @backoff.on_exception( exception=ConnectionError, @@ -288,7 +317,8 @@ def ingest_chunk_config(self, transaction_id: int, chunk_id: str) -> ChunkLocati data=json.dumps(dict(transaction_id=transaction_id, chunk=chunk_id, auth_key=self.auth_key, version=self.repl_api_version,)), ) - return ChunkLocation(chunk_id, res["location"]["host"], res["location"]["port"]) + return ChunkLocation(chunk_id, res["location"]["host"], str(res["location"]["port"]), + res["location"]["http_host"], str(res["location"]["http_port"])) def ingest_chunk_configs(self, transaction_id: int, chunk_ids: List[int]) -> List[ChunkLocation]: """Get the locations where a list of chunk ids should be ingested. @@ -310,7 +340,8 @@ def ingest_chunk_configs(self, transaction_id: int, chunk_ids: List[int]) -> Lis data=json.dumps(dict(transaction_id=transaction_id, chunks=chunk_ids, auth_key=self.auth_key, version=self.repl_api_version,)), ) - return [ChunkLocation(l["chunk"], l["host"], str(l["port"])) for l in res["location"]] + return [ChunkLocation(l["chunk"], l["host"], str(l["port"]), + l["http_host"], str(l["http_port"])) for l in res["location"]] def ingest_regular_table(self, transaction_id: int) -> List[RegularLocation]: """Get the locations where a non-chunk table should be ingested. @@ -331,13 +362,18 @@ def ingest_regular_table(self, transaction_id: int) -> List[RegularLocation]: url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/regular?version={self.repl_api_version}", data=json.dumps(dict(auth_key=self.auth_key, transaction_id=transaction_id,)), ) - return [RegularLocation(location["host"], str(location["port"])) for location in res["locations"]] + return [RegularLocation(location["host"], str(location["port"]), + location["http_host"], str(location["http_port"])) for location in res["locations"]] def ingest_data_file( self, transaction_id: int, + chunk_id: str, + overlap: bool, worker_host: str, worker_port: str, + worker_http_host: str, + worker_http_port: str, data_file: str, table: LoadTable, load_http: bool, @@ -348,10 +384,18 @@ def ingest_data_file( ---------- transaction_id : `int` The transaction id. + chunk_id : `str` + The chunk id. + overlap : `bool` + The flag indicating if the file reprsentes the chunk overlap. worker_host : `str` The name of the host ingesting the data. worker_port : `str` The worker_host port to use. + worker_http_host : `str` + The name of the host ingesting the data (HTTP protocol). + worker_http_port : `str` + The worker_host port to use (HTTP protocol). data_file : `str` The path to the data file to ingest. table : `LoadTable` @@ -362,38 +406,59 @@ def ingest_data_file( if not self.auth_key: raise RuntimeError("auth_key must be set to ingest a data file.") if load_http: - raise NotImplementedError("HTTP-based table data loading protocol not implemented.") - args = [ - "qserv-replica-file", - "INGEST", - "FILE", - worker_host, - worker_port, - str(transaction_id), - table.table_name, - # app help says P for 'partitioned' and R for 'regular'/non-partitioned. - "P" if table.is_partitioned else "R", - data_file, - "--verbose", - f"--fields-terminated-by={table.fields_terminated_by}", - f"--fields-enclosed-by={table.fields_enclosed_by}", - f"--fields-escaped-by={table.fields_escaped_by}", - f"--auth-key={self.auth_key}", - f"--lines-terminated-by={table.lines_terminated_by}", - ] - _log.debug("ingest file args: %s", args) - res = subprocess.run( - args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - errors="replace", - ) - if res.returncode != 0: - raise RuntimeError( - f"Subprocess failed ({res.returncode}) stdout:{res.stdout} stderr:{res.stderr}" + encoder = MultipartEncoder( + fields={ + "auth_key": (None, self.auth_key), + "transaction_id": (None, str(transaction_id)), + "table": (None, table.table_name), + "chunk": (None, str(chunk_id)), + "overlap": (None, str("1" if overlap else "0")), + "fields_terminated_by": (None, str(table.fields_terminated_by)), + "fields_enclosed_by": (None, str(table.fields_enclosed_by)), + "fields_escaped_by": (None, str(table.fields_escaped_by)), + "lines_terminated_by": (None, str(table.lines_terminated_by)), + "file": (os.path.basename(data_file), open(data_file, "rb"), "text/plain"), + } + ) + _log.debug("encoder: %s", encoder) + res_http = _post_file_upload( + url=f"http://{worker_http_host}:{worker_http_port}/ingest/csv", + encoder=encoder) + if not res_http["success"]: + raise RuntimeError(f"Ingest failed ({res_http})") + _log.debug("ingest file res: %s", res_http) + else: + args = [ + "qserv-replica-file", + "INGEST", + "FILE", + worker_host, + worker_port, + str(transaction_id), + table.table_name, + # app help says P for 'partitioned' and R for 'regular'/non-partitioned. + "P" if table.is_partitioned else "R", + data_file, + "--verbose", + f"--fields-terminated-by={table.fields_terminated_by}", + f"--fields-enclosed-by={table.fields_enclosed_by}", + f"--fields-escaped-by={table.fields_escaped_by}", + f"--auth-key={self.auth_key}", + f"--lines-terminated-by={table.lines_terminated_by}", + ] + _log.debug("ingest file args: %s", args) + res = subprocess.run( + args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + errors="replace", ) - _log.debug("ingest file res: %s", res) + if res.returncode != 0: + raise RuntimeError( + f"Subprocess failed ({res.returncode}) stdout:{res.stdout} stderr:{res.stderr}" + ) + _log.debug("ingest file res: %s", res) def build_table_stats( self, @@ -475,29 +540,37 @@ def ingest_chunks_data( # Ingest the chunk files: # Helpful note: Generator type decl is Generator[yield, send, return], # see https://www.python.org/dev/peps/pep-0484/#annotating-generator-functions-and-coroutines - def generate_locations() -> Generator[Tuple[str, str, str], None, None]: + def generate_locations() -> Generator[Tuple[str, str, str, str, str, str, bool], None, None]: for location in locations: for chunk_file in (chunk_file_t, chunk_overlap_file_t): full_path = os.path.join(chunks_folder, chunk_file.format(chunk_id=location.chunk_id)) if os.path.exists(full_path): _log.debug( - f"Ingesting %s to %s:%s", + f"Ingesting %s to %s:%s/%s:%s chunk %s.", full_path, location.host, location.port, + location.http_host, + location.http_port, + location.chunk_id, ) - yield full_path, location.host, location.port + overlap = "overlap" in chunk_file + yield full_path, location.host, location.port, location.http_host, location.http_port, location.chunk_id, overlap else: _log.warn( "Not ingesting %s; it does not exist (probably there is no data for that chunk).", full_path, ) - for _file, host, port in generate_locations(): + for _file, host, port, http_host, http_port, chunk_id, overlap in generate_locations(): self.ingest_data_file( transaction_id, + chunk_id, + overlap, host, port, + http_host, + http_port, data_file=_file, table=table, load_http=load_http @@ -531,16 +604,22 @@ def ingest_table_data( locations = self.ingest_regular_table(transaction_id) for location in locations: _log.debug( - "Ingesting %s to %s:%s table %s.", + "Ingesting %s to %s:%s/%s:%s table %s.", data_file, location.host, location.port, + location.http_host, + location.http_port, table.table_name, ) self.ingest_data_file( transaction_id, + "0", + False, location.host, location.port, + location.http_host, + location.http_port, data_file=data_file, table=table, load_http=load_http,