From ff2924e7bfdc1e8e7d2d1211f89a881e8349b5ad Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Fri, 6 Oct 2023 18:43:38 -0400 Subject: [PATCH] Update scale down scenario, update dataset calibration script (#304) * Update config to support new dataset format * Experiment runner fixes * Additional runner fixes * Add dataset properties --- config/planner.yml | 28 +++++++++++++ config/temp_config_sample.yml | 3 ++ experiments/15-e2e-scenarios-v2/common.sh | 16 +++++--- .../15-e2e-scenarios-v2/scale_down/COND | 6 ++- src/brad/config/temp_config.py | 7 +++- src/brad/daemon/daemon.py | 35 +++++++++++------ src/brad/data_sync/operators/unload_to_s3.py | 18 +++++++-- .../scoring/data_access/precomputed_values.py | 32 +++++++++++++++ .../performance/precomputed_predictions.py | 39 +++++++++++++++++++ tools/calibration/table_sizes.py | 30 +++++++++++--- .../IMDB_extended/run_repeating_analytics.py | 6 +-- .../workload_utils/geospatial_worker.py | 2 +- .../workload_utils/transaction_worker.py | 4 +- 13 files changed, 192 insertions(+), 34 deletions(-) diff --git a/config/planner.yml b/config/planner.yml index f3423389..68d5c8c5 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -151,6 +151,34 @@ table_extract_bytes_per_row: movie_info: 66.48728137225561 person_info: 133.20102076348735 + imdb_extended_20g: + homes: 20.556198 + theatres: 32.4135 + showings: 46.01808 + ticket_orders: 36.773404 + aka_name: 81.795698 + aka_title: 115.56834554266997 + cast_info: 35.296638 + char_name: 71.25104 + comp_cast_type: 11.25 + company_name: 78.345222 + company_type: 23.0 + complete_cast: 17.87376189982678 + info_type: 17.061946902654867 + keyword: 28.256227174480138 + kind_type: 12.142857142857142 + link_type: 14.5 + movie_companies: 42.636226 + movie_info_idx: 26.61975 + movie_keyword: 18.729882 + movie_link: 21.888322165549887 + name: 78.5697 + role_type: 13.333333333333334 + title: 88.5209 + movie_info: 29.449154 + person_info: 133.458044 + + ### ### Performance metrics scaling modifiers. ### (Used by neighborhood planning.) diff --git a/config/temp_config_sample.yml b/config/temp_config_sample.yml index 307f17c3..743632d9 100644 --- a/config/temp_config_sample.yml +++ b/config/temp_config_sample.yml @@ -1,5 +1,8 @@ latency_ceiling_s: 30.0 +# Use this instead of the individual paths below. +std_dataset_path: workloads/IMDB_20GB/regular_test/ + aurora_preds_path: workloads/IMDB/OLAP_queries_new/pred_aurora_300.npy redshift_preds_path: workloads/IMDB/OLAP_queries_new/pred_redshift_300.npy athena_preds_path: workloads/IMDB/OLAP_queries_new/pred_athena_300.npy diff --git a/experiments/15-e2e-scenarios-v2/common.sh b/experiments/15-e2e-scenarios-v2/common.sh index 3b487e3d..fe1dc1f6 100644 --- a/experiments/15-e2e-scenarios-v2/common.sh +++ b/experiments/15-e2e-scenarios-v2/common.sh @@ -1,7 +1,7 @@ function start_brad() { config_file=$1 - pushd ../../ + pushd ../../../ brad daemon \ --config-file $config_file \ --schema-name $schema_name \ @@ -79,13 +79,13 @@ function start_repeating_olap_runner() { mkdir $results_dir log_workload_point "rana_${ra_clients}" - COND_OUT=$results_dir python3 ../../workloads/IMDB_extended/run_repeating_analytics.py \ + COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_repeating_analytics.py \ --num-clients $ra_clients \ --avg-gap-s $ra_gap_s \ --avg-gap-std-s $ra_gap_std_s \ --num-front-ends $num_front_ends \ - --query-indexes $query_indexes \ - --query-bank-file $query_bank_file \ + --query-indexes $ra_query_indexes \ + --query-bank-file $ra_query_bank_file \ & rana_pid=$! } @@ -98,7 +98,7 @@ function start_txn_runner() { mkdir $results_dir log_workload_point "txn_${t_clients}" - COND_OUT=$results_dir python3 ../../workloads/IMDB_extended/run_transactions.py \ + COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_transactions.py \ --num-clients $t_clients \ --num-front-ends $num_front_ends \ & @@ -129,7 +129,7 @@ function extract_named_arguments() { fi if [[ $phys_arg =~ --ra-query-bank-file=.+ ]]; then - ra_query_bank_file=${phys_arg:22} + ra_query_bank_file=${phys_arg:21} fi if [[ $phys_arg =~ --ra-gap-s=.+ ]]; then @@ -152,6 +152,10 @@ function extract_named_arguments() { config_file=${phys_arg:14} fi + if [[ $phys_arg =~ --planner-config-file=.+ ]]; then + planner_config_file=${phys_arg:22} + fi + if [[ $phys_arg =~ --skip-replan=.+ ]]; then skip_replan=${phys_arg:14} fi diff --git a/experiments/15-e2e-scenarios-v2/scale_down/COND b/experiments/15-e2e-scenarios-v2/scale_down/COND index 6f9f6b36..7559acf1 100644 --- a/experiments/15-e2e-scenarios-v2/scale_down/COND +++ b/experiments/15-e2e-scenarios-v2/scale_down/COND @@ -18,10 +18,12 @@ run_experiment( run="./run_workload.sh", options={ # TODO: Ideally, configurations are shared. Only keep AWS secrets separate. - "config-file": "config/config_20g.yml", + "config-file": "config/config_large.yml", "planner-config-file": "config/planner.yml", + "schema-name": "imdb_extended_20g", # TODO: Select regular query indexes - "ra-query-indexes": "0,1,2", + "ra-query-indexes": "0,1,25,26,50,51,75,76,27,28", "ra-query-bank-file": IMDB_20GB_REGULAR_QUERY_BANK, + "num-front-ends": 4, }, ) diff --git a/src/brad/config/temp_config.py b/src/brad/config/temp_config.py index d2f11190..4e477065 100644 --- a/src/brad/config/temp_config.py +++ b/src/brad/config/temp_config.py @@ -1,6 +1,6 @@ import pathlib import yaml -from typing import Any, Dict +from typing import Any, Dict, Optional class TempConfig: @@ -19,6 +19,11 @@ def __init__(self, raw: Dict[str, Any]) -> None: def latency_ceiling_s(self) -> float: return float(self._raw["latency_ceiling_s"]) + def std_dataset_path(self) -> Optional[pathlib.Path]: + if "std_dataset_path" not in self._raw: + return None + return pathlib.Path(self._raw["std_dataset_path"]) + def aurora_preds_path(self) -> pathlib.Path: return pathlib.Path(self._raw["aurora_preds_path"]) diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index c5ecbf43..f51c873c 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -148,17 +148,30 @@ async def _run_setup(self) -> None: if self._temp_config is not None: # TODO: Actually call into the models. We avoid doing so for now to # avoid having to implement model loading, etc. - latency_scorer: AnalyticsLatencyScorer = PrecomputedPredictions.load( - workload_file_path=self._temp_config.query_bank_path(), - aurora_predictions_path=self._temp_config.aurora_preds_path(), - redshift_predictions_path=self._temp_config.redshift_preds_path(), - athena_predictions_path=self._temp_config.athena_preds_path(), - ) - data_access_provider = PrecomputedDataAccessProvider.load( - workload_file_path=self._temp_config.query_bank_path(), - aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(), - athena_accessed_bytes_path=self._temp_config.athena_data_access_path(), - ) + std_dataset_path = self._temp_config.std_dataset_path() + if std_dataset_path is not None: + latency_scorer: AnalyticsLatencyScorer = ( + PrecomputedPredictions.load_from_standard_dataset( + dataset_path=std_dataset_path, + ) + ) + data_access_provider = ( + PrecomputedDataAccessProvider.load_from_standard_dataset( + dataset_path=std_dataset_path, + ) + ) + else: + latency_scorer = PrecomputedPredictions.load( + workload_file_path=self._temp_config.query_bank_path(), + aurora_predictions_path=self._temp_config.aurora_preds_path(), + redshift_predictions_path=self._temp_config.redshift_preds_path(), + athena_predictions_path=self._temp_config.athena_preds_path(), + ) + data_access_provider = PrecomputedDataAccessProvider.load( + workload_file_path=self._temp_config.query_bank_path(), + aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(), + athena_accessed_bytes_path=self._temp_config.athena_data_access_path(), + ) comparator = best_cost_under_p99_latency( max_latency_ceiling_s=self._temp_config.latency_ceiling_s() ) diff --git a/src/brad/data_sync/operators/unload_to_s3.py b/src/brad/data_sync/operators/unload_to_s3.py index 029386d2..1ddabaa5 100644 --- a/src/brad/data_sync/operators/unload_to_s3.py +++ b/src/brad/data_sync/operators/unload_to_s3.py @@ -1,4 +1,5 @@ import logging +from typing import Optional from .operator import Operator from brad.data_sync.execution.context import ExecutionContext @@ -8,7 +9,7 @@ _AURORA_UNLOAD_TEMPLATE = """ SELECT * FROM aws_s3.query_export_to_s3( - 'SELECT * FROM {table_name}', + '{query}', aws_commons.create_s3_uri( '{s3_bucket}', '{s3_path}', @@ -34,7 +35,13 @@ class UnloadToS3(Operator): Dumps data from a table onto S3. """ - def __init__(self, table_name: str, relative_s3_path: str, engine: Engine) -> None: + def __init__( + self, + table_name: str, + relative_s3_path: str, + engine: Engine, + limit: Optional[int] = None, + ) -> None: """ NOTE: All S3 paths are relative to the extract path, specified in the configuration. @@ -43,6 +50,7 @@ def __init__(self, table_name: str, relative_s3_path: str, engine: Engine) -> No self._table_name = table_name self._engine = engine self._relative_s3_path = relative_s3_path + self._limit = limit def __repr__(self) -> str: return "".join( @@ -67,8 +75,12 @@ async def execute(self, ctx: ExecutionContext) -> "Operator": raise RuntimeError("Unsupported engine {}".format(self._engine)) async def _execute_aurora(self, ctx: ExecutionContext) -> "Operator": + inner_query = f"SELECT * FROM {self._table_name}" + if self._limit is not None: + inner_query += f" LIMIT {self._limit}" + query = _AURORA_UNLOAD_TEMPLATE.format( - table_name=self._table_name, + query=inner_query, s3_bucket=ctx.s3_bucket(), s3_region=ctx.s3_region(), s3_path="{}{}".format(ctx.s3_path(), self._relative_s3_path), diff --git a/src/brad/planner/scoring/data_access/precomputed_values.py b/src/brad/planner/scoring/data_access/precomputed_values.py index 449ba23b..c8fc6819 100644 --- a/src/brad/planner/scoring/data_access/precomputed_values.py +++ b/src/brad/planner/scoring/data_access/precomputed_values.py @@ -16,6 +16,38 @@ class PrecomputedDataAccessProvider(DataAccessProvider): Used for debugging purposes. """ + @classmethod + def load_from_standard_dataset( + cls, + dataset_path: str | pathlib.Path, + ): + if isinstance(dataset_path, pathlib.Path): + dsp = dataset_path + else: + dsp = pathlib.Path(dataset_path) + + with open(dsp / "queries.sql", "r", encoding="UTF-8") as query_file: + raw_queries = [line.strip() for line in query_file] + + queries_map = {query: idx for idx, query in enumerate(raw_queries)} + queries_map = {} + for idx, query in enumerate(raw_queries): + if query.endswith(";"): + queries_map[query[:-1]] = idx + else: + queries_map[query] = idx + + data_stats = np.load(dsp / "pred-data_accessed-athena-aurora.npy") + # TODO: Maybe we might want a better placeholder. + data_stats[np.isnan(data_stats)] = 0 + + aurora = data_stats[:, 1] + athena = data_stats[:, 0] + assert len(aurora.shape) == 1 + assert len(athena.shape) == 1 + + return cls(queries_map, aurora, athena) + @classmethod def load( cls, diff --git a/src/brad/planner/scoring/performance/precomputed_predictions.py b/src/brad/planner/scoring/performance/precomputed_predictions.py index 3d8595a4..b2d2aa83 100644 --- a/src/brad/planner/scoring/performance/precomputed_predictions.py +++ b/src/brad/planner/scoring/performance/precomputed_predictions.py @@ -17,6 +17,45 @@ class PrecomputedPredictions(AnalyticsLatencyScorer): Used for debugging purposes. """ + @classmethod + def load_from_standard_dataset( + cls, dataset_path: str | pathlib.Path + ) -> "PrecomputedPredictions": + if isinstance(dataset_path, pathlib.Path): + dsp = dataset_path + else: + dsp = pathlib.Path(dataset_path) + + with open(dsp / "queries.sql", "r", encoding="UTF-8") as query_file: + raw_queries = [line.strip() for line in query_file] + + queries_map = {} + for idx, query in enumerate(raw_queries): + if query.endswith(";"): + queries_map[query[:-1]] = idx + else: + queries_map[query] = idx + + rt_preds = np.load(dsp / "pred-run_time_s-athena-aurora-redshift.npy") + + # Sanity check. + assert rt_preds.shape[0] == len(raw_queries) + + preds = [np.array([]), np.array([]), np.array([])] + preds[Workload.EngineLatencyIndex[Engine.Aurora]] = rt_preds[:, 1] + preds[Workload.EngineLatencyIndex[Engine.Redshift]] = rt_preds[:, 2] + preds[Workload.EngineLatencyIndex[Engine.Athena]] = rt_preds[:, 0] + + predictions = np.stack(preds, axis=-1) + + # Replace any `inf` / `nan` values in the predictions with this value. + # This prevents a degenerate case in performance estimation. + timeout_value_s = 210.0 + predictions[np.isinf(predictions)] = timeout_value_s + predictions[np.isnan(predictions)] = timeout_value_s + + return cls(queries_map, predictions) + @classmethod def load( cls, diff --git a/tools/calibration/table_sizes.py b/tools/calibration/table_sizes.py index ef058f58..51677f5f 100644 --- a/tools/calibration/table_sizes.py +++ b/tools/calibration/table_sizes.py @@ -3,6 +3,7 @@ import argparse import logging import json +import yaml from typing import Dict from brad.asset_manager import AssetManager @@ -10,6 +11,7 @@ from brad.config.file import ConfigFile from brad.data_sync.execution.context import ExecutionContext from brad.data_sync.operators.unload_to_s3 import UnloadToS3 +from brad.provisioning.directory import Directory from brad.blueprint.manager import BlueprintManager from brad.front_end.engine_connections import EngineConnections from brad.utils.table_sizer import TableSizer @@ -36,11 +38,22 @@ async def main_impl(args) -> None: bp = mgr.get_blueprint() logger.info("Using blueprint: %s", bp) + directory = Directory(config) + await directory.refresh() + engines_sync = EngineConnections.connect_sync( - config, args.schema_name, autocommit=True, specific_engines={Engine.Aurora} + config, + directory, + args.schema_name, + autocommit=True, + specific_engines={Engine.Aurora}, ) engines = await EngineConnections.connect( - config, args.schema_name, autocommit=True, specific_engines={Engine.Aurora} + config, + directory, + args.schema_name, + autocommit=True, + specific_engines={Engine.Aurora}, ) boto_client = boto3.client( @@ -70,16 +83,17 @@ async def main_impl(args) -> None: full_extract_path = f"{config.s3_extract_path}{extract_file}" num_rows = table_sizer.table_size_rows(table.name, Engine.Aurora) - op = UnloadToS3(table.name, extract_file, Engine.Aurora) + extract_limit = min(num_rows, args.max_rows) + op = UnloadToS3(table.name, extract_file, Engine.Aurora, extract_limit) await op.execute(ctx) table_bytes = s3_object_size_bytes( boto_client, config.s3_extract_bucket, full_extract_path ) - b_per_row = table_bytes / num_rows + b_per_row = table_bytes / extract_limit logger.info( - "%s: %d rows, extracted %d B total", table.name, num_rows, table_bytes + "%s: %d rows, extracted %d B total", table.name, extract_limit, table_bytes ) bytes_per_row[table.name] = b_per_row @@ -93,6 +107,9 @@ async def main_impl(args) -> None: for table_name, bpr in bytes_per_row.items(): print(f" {table_name}: {bpr}", flush=True) + with open(args.schema_name + "_data.yaml", "w", encoding="UTF-8") as file: + yaml.dump(bytes_per_row, file, default_flow_style=False) + await engines.close() engines_sync.close_sync() @@ -105,6 +122,9 @@ def main(): parser.add_argument("--config-file", type=str, required=True) parser.add_argument("--schema-name", type=str, required=True) parser.add_argument("--debug", action="store_true") + # Unloading is slow - we do not need to unload the entire table to get a + # reasonable idea of the size per row. + parser.add_argument("--max-rows", type=int, default=500_000) args = parser.parse_args() set_up_logging(debug_mode=args.debug) diff --git a/workloads/IMDB_extended/run_repeating_analytics.py b/workloads/IMDB_extended/run_repeating_analytics.py index 44fb54cb..a743cf0a 100644 --- a/workloads/IMDB_extended/run_repeating_analytics.py +++ b/workloads/IMDB_extended/run_repeating_analytics.py @@ -13,7 +13,7 @@ from typing import List from datetime import datetime, timedelta -from .workload_utils.database import Database, BradDatabase, PyodbcDatabase +from workload_utils.database import Database, BradDatabase, PyodbcDatabase from brad.grpc_client import BradGrpcClient, BradClientError from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff from typing import Dict @@ -208,8 +208,8 @@ def run_warmup(args, query_bank: List[str], queries: List[int]): def main(): parser = argparse.ArgumentParser() - parser.add_argument("--host", type=str, default="localhost") - parser.add_argument("--port", type=int, default=6583) + parser.add_argument("--brad-host", type=str, default="localhost") + parser.add_argument("--brad-port", type=int, default=6583) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--num-front-ends", type=int, default=1) parser.add_argument("--run-warmup", action="store_true") diff --git a/workloads/IMDB_extended/workload_utils/geospatial_worker.py b/workloads/IMDB_extended/workload_utils/geospatial_worker.py index 8267d7d4..4eb27ddf 100644 --- a/workloads/IMDB_extended/workload_utils/geospatial_worker.py +++ b/workloads/IMDB_extended/workload_utils/geospatial_worker.py @@ -1,7 +1,7 @@ import random import logging -from .database import Database +from workload_utils.database import Database logger = logging.getLogger(__name__) diff --git a/workloads/IMDB_extended/workload_utils/transaction_worker.py b/workloads/IMDB_extended/workload_utils/transaction_worker.py index 8db0b22b..d7031512 100644 --- a/workloads/IMDB_extended/workload_utils/transaction_worker.py +++ b/workloads/IMDB_extended/workload_utils/transaction_worker.py @@ -4,8 +4,8 @@ from typing import List, Tuple, Any from brad.grpc_client import RowList, BradClientError -from .database import Database -from .dataset_config import ( +from workload_utils.database import Database +from workload_utils.dataset_config import ( MIN_MOVIE_ID, THEATRES_PER_SF, MIN_THEATRE_ID,