Skip to content

Commit

Permalink
Implemented the HTTP-based push mode igest in the integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Oct 4, 2024
1 parent 5bcc0dc commit cb23a93
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 39 deletions.
4 changes: 4 additions & 0 deletions src/admin/python/lsst/qserv/admin/cli/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,8 +1027,12 @@ def load_simple(repl_ctrl_uri: str, auth_key: str, load_http: bool) -> None:
chunk_location = repl.ingest_chunk_config(transaction_id, "0")
repl.ingest_data_file(
transaction_id,
"0",
False,
chunk_location.host,
chunk_location.port,
chunk_location.http_host,
chunk_location.http_port,
data_file=data_file,
table=table,
load_http=load_http,
Expand Down
157 changes: 118 additions & 39 deletions src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
from requests import delete, get, post, put
from requests.exceptions import ConnectionError
from requests_toolbelt.multipart.encoder import MultipartEncoder
import subprocess
from .itest_table import LoadTable
from typing import Any, Callable, Dict, Generator, List, Optional, NamedTuple, Tuple
Expand All @@ -47,11 +48,15 @@ class ChunkLocation(NamedTuple):
chunk_id: str
host: str
port: str
http_host: str
http_port: str


class RegularLocation(NamedTuple):
host: str
port: str
http_host: str
http_port: str


def _check(result: Dict[Any, Any], url: str) -> None:
Expand Down Expand Up @@ -94,6 +99,30 @@ def _post(url: str, data: str) -> Dict[Any, Any]:
_check(res, url)
return res

@backoff.on_exception(
exception=ConnectionError,
wait_gen=backoff.expo,
on_backoff=on_backoff(log=_log),
max_time=max_backoff_sec,
)
def _post_file_upload(url: str, encoder: MultipartEncoder) -> Dict[Any, Any]:
"""Call requests.post and check the result for success=1.
Parameters
----------
url : `str`
The url to send to `post`.
data : `data`
The data to send to `post`.
Returns
-------
result : `dict`
The dict containing the result of calling `post`.
"""
res: Dict[Any, Any] = post(url, data=encoder, headers={'Content-Type': encoder.content_type}).json()
_check(res, url)
return res

@backoff.on_exception(
exception=ConnectionError,
Expand Down Expand Up @@ -288,7 +317,8 @@ def ingest_chunk_config(self, transaction_id: int, chunk_id: str) -> ChunkLocati
data=json.dumps(dict(transaction_id=transaction_id, chunk=chunk_id, auth_key=self.auth_key,
version=self.repl_api_version,)),
)
return ChunkLocation(chunk_id, res["location"]["host"], res["location"]["port"])
return ChunkLocation(chunk_id, res["location"]["host"], str(res["location"]["port"]),
res["location"]["http_host"], str(res["location"]["http_port"]))

def ingest_chunk_configs(self, transaction_id: int, chunk_ids: List[int]) -> List[ChunkLocation]:
"""Get the locations where a list of chunk ids should be ingested.
Expand All @@ -310,7 +340,8 @@ def ingest_chunk_configs(self, transaction_id: int, chunk_ids: List[int]) -> Lis
data=json.dumps(dict(transaction_id=transaction_id, chunks=chunk_ids, auth_key=self.auth_key,
version=self.repl_api_version,)),
)
return [ChunkLocation(l["chunk"], l["host"], str(l["port"])) for l in res["location"]]
return [ChunkLocation(l["chunk"], l["host"], str(l["port"]),
l["http_host"], str(l["http_port"])) for l in res["location"]]

def ingest_regular_table(self, transaction_id: int) -> List[RegularLocation]:
"""Get the locations where a non-chunk table should be ingested.
Expand All @@ -331,13 +362,18 @@ def ingest_regular_table(self, transaction_id: int) -> List[RegularLocation]:
url=f"http://{self.repl_ctrl.hostname}:{self.repl_ctrl.port}/ingest/regular?version={self.repl_api_version}",
data=json.dumps(dict(auth_key=self.auth_key, transaction_id=transaction_id,)),
)
return [RegularLocation(location["host"], str(location["port"])) for location in res["locations"]]
return [RegularLocation(location["host"], str(location["port"]),
location["http_host"], str(location["http_port"])) for location in res["locations"]]

def ingest_data_file(
self,
transaction_id: int,
chunk_id: str,
overlap: bool,
worker_host: str,
worker_port: str,
worker_http_host: str,
worker_http_port: str,
data_file: str,
table: LoadTable,
load_http: bool,
Expand All @@ -348,10 +384,18 @@ def ingest_data_file(
----------
transaction_id : `int`
The transaction id.
chunk_id : `str`
The chunk id.
overlap : `bool`
The flag indicating if the file reprsentes the chunk overlap.
worker_host : `str`
The name of the host ingesting the data.
worker_port : `str`
The worker_host port to use.
worker_http_host : `str`
The name of the host ingesting the data (HTTP protocol).
worker_http_port : `str`
The worker_host port to use (HTTP protocol).
data_file : `str`
The path to the data file to ingest.
table : `LoadTable`
Expand All @@ -362,38 +406,59 @@ def ingest_data_file(
if not self.auth_key:
raise RuntimeError("auth_key must be set to ingest a data file.")
if load_http:
raise NotImplementedError("HTTP-based table data loading protocol not implemented.")
args = [
"qserv-replica-file",
"INGEST",
"FILE",
worker_host,
worker_port,
str(transaction_id),
table.table_name,
# app help says P for 'partitioned' and R for 'regular'/non-partitioned.
"P" if table.is_partitioned else "R",
data_file,
"--verbose",
f"--fields-terminated-by={table.fields_terminated_by}",
f"--fields-enclosed-by={table.fields_enclosed_by}",
f"--fields-escaped-by={table.fields_escaped_by}",
f"--auth-key={self.auth_key}",
f"--lines-terminated-by={table.lines_terminated_by}",
]
_log.debug("ingest file args: %s", args)
res = subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
errors="replace",
)
if res.returncode != 0:
raise RuntimeError(
f"Subprocess failed ({res.returncode}) stdout:{res.stdout} stderr:{res.stderr}"
encoder = MultipartEncoder(
fields={
"auth_key": (None, self.auth_key),
"transaction_id": (None, str(transaction_id)),
"table": (None, table.table_name),
"chunk": (None, str(chunk_id)),
"overlap": (None, str("1" if overlap else "0")),
"fields_terminated_by": (None, str(table.fields_terminated_by)),
"fields_enclosed_by": (None, str(table.fields_enclosed_by)),
"fields_escaped_by": (None, str(table.fields_escaped_by)),
"lines_terminated_by": (None, str(table.lines_terminated_by)),
"file": (os.path.basename(data_file), open(data_file, "rb"), "text/plain"),
}
)
_log.debug("encoder: %s", encoder)
res_http = _post_file_upload(
url=f"http://{worker_http_host}:{worker_http_port}/ingest/csv",
encoder=encoder)
if not res_http["success"]:
raise RuntimeError(f"Ingest failed ({res_http})")
_log.debug("ingest file res: %s", res_http)
else:
args = [
"qserv-replica-file",
"INGEST",
"FILE",
worker_host,
worker_port,
str(transaction_id),
table.table_name,
# app help says P for 'partitioned' and R for 'regular'/non-partitioned.
"P" if table.is_partitioned else "R",
data_file,
"--verbose",
f"--fields-terminated-by={table.fields_terminated_by}",
f"--fields-enclosed-by={table.fields_enclosed_by}",
f"--fields-escaped-by={table.fields_escaped_by}",
f"--auth-key={self.auth_key}",
f"--lines-terminated-by={table.lines_terminated_by}",
]
_log.debug("ingest file args: %s", args)
res = subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
errors="replace",
)
_log.debug("ingest file res: %s", res)
if res.returncode != 0:
raise RuntimeError(
f"Subprocess failed ({res.returncode}) stdout:{res.stdout} stderr:{res.stderr}"
)
_log.debug("ingest file res: %s", res)

def build_table_stats(
self,
Expand Down Expand Up @@ -475,29 +540,37 @@ def ingest_chunks_data(
# Ingest the chunk files:
# Helpful note: Generator type decl is Generator[yield, send, return],
# see https://www.python.org/dev/peps/pep-0484/#annotating-generator-functions-and-coroutines
def generate_locations() -> Generator[Tuple[str, str, str], None, None]:
def generate_locations() -> Generator[Tuple[str, str, str, str, str, str, bool], None, None]:
for location in locations:
for chunk_file in (chunk_file_t, chunk_overlap_file_t):
full_path = os.path.join(chunks_folder, chunk_file.format(chunk_id=location.chunk_id))
if os.path.exists(full_path):
_log.debug(
f"Ingesting %s to %s:%s",
f"Ingesting %s to %s:%s/%s:%s chunk %s.",
full_path,
location.host,
location.port,
location.http_host,
location.http_port,
location.chunk_id,
)
yield full_path, location.host, location.port
overlap = "overlap" in chunk_file
yield full_path, location.host, location.port, location.http_host, location.http_port, location.chunk_id, overlap
else:
_log.warn(
"Not ingesting %s; it does not exist (probably there is no data for that chunk).",
full_path,
)

for _file, host, port in generate_locations():
for _file, host, port, http_host, http_port, chunk_id, overlap in generate_locations():
self.ingest_data_file(
transaction_id,
chunk_id,
overlap,
host,
port,
http_host,
http_port,
data_file=_file,
table=table,
load_http=load_http
Expand Down Expand Up @@ -531,16 +604,22 @@ def ingest_table_data(
locations = self.ingest_regular_table(transaction_id)
for location in locations:
_log.debug(
"Ingesting %s to %s:%s table %s.",
"Ingesting %s to %s:%s/%s:%s table %s.",
data_file,
location.host,
location.port,
location.http_host,
location.http_port,
table.table_name,
)
self.ingest_data_file(
transaction_id,
"0",
False,
location.host,
location.port,
location.http_host,
location.http_port,
data_file=data_file,
table=table,
load_http=load_http,
Expand Down

0 comments on commit cb23a93

Please sign in to comment.