Skip to content

Commit

Permalink
Update scale down scenario, update dataset calibration script (#304)
Browse files Browse the repository at this point in the history
* Update config to support new dataset format

* Experiment runner fixes

* Additional runner fixes

* Add dataset properties
  • Loading branch information
geoffxy authored Oct 6, 2023
1 parent d4888b0 commit ff2924e
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 34 deletions.
28 changes: 28 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,34 @@ table_extract_bytes_per_row:
movie_info: 66.48728137225561
person_info: 133.20102076348735

imdb_extended_20g:
homes: 20.556198
theatres: 32.4135
showings: 46.01808
ticket_orders: 36.773404
aka_name: 81.795698
aka_title: 115.56834554266997
cast_info: 35.296638
char_name: 71.25104
comp_cast_type: 11.25
company_name: 78.345222
company_type: 23.0
complete_cast: 17.87376189982678
info_type: 17.061946902654867
keyword: 28.256227174480138
kind_type: 12.142857142857142
link_type: 14.5
movie_companies: 42.636226
movie_info_idx: 26.61975
movie_keyword: 18.729882
movie_link: 21.888322165549887
name: 78.5697
role_type: 13.333333333333334
title: 88.5209
movie_info: 29.449154
person_info: 133.458044


###
### Performance metrics scaling modifiers.
### (Used by neighborhood planning.)
Expand Down
3 changes: 3 additions & 0 deletions config/temp_config_sample.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
latency_ceiling_s: 30.0

# Use this instead of the individual paths below.
std_dataset_path: workloads/IMDB_20GB/regular_test/

aurora_preds_path: workloads/IMDB/OLAP_queries_new/pred_aurora_300.npy
redshift_preds_path: workloads/IMDB/OLAP_queries_new/pred_redshift_300.npy
athena_preds_path: workloads/IMDB/OLAP_queries_new/pred_athena_300.npy
Expand Down
16 changes: 10 additions & 6 deletions experiments/15-e2e-scenarios-v2/common.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
function start_brad() {
config_file=$1

pushd ../../
pushd ../../../
brad daemon \
--config-file $config_file \
--schema-name $schema_name \
Expand Down Expand Up @@ -79,13 +79,13 @@ function start_repeating_olap_runner() {
mkdir $results_dir

log_workload_point "rana_${ra_clients}"
COND_OUT=$results_dir python3 ../../workloads/IMDB_extended/run_repeating_analytics.py \
COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_repeating_analytics.py \
--num-clients $ra_clients \
--avg-gap-s $ra_gap_s \
--avg-gap-std-s $ra_gap_std_s \
--num-front-ends $num_front_ends \
--query-indexes $query_indexes \
--query-bank-file $query_bank_file \
--query-indexes $ra_query_indexes \
--query-bank-file $ra_query_bank_file \
&
rana_pid=$!
}
Expand All @@ -98,7 +98,7 @@ function start_txn_runner() {
mkdir $results_dir

log_workload_point "txn_${t_clients}"
COND_OUT=$results_dir python3 ../../workloads/IMDB_extended/run_transactions.py \
COND_OUT=$results_dir python3 ../../../workloads/IMDB_extended/run_transactions.py \
--num-clients $t_clients \
--num-front-ends $num_front_ends \
&
Expand Down Expand Up @@ -129,7 +129,7 @@ function extract_named_arguments() {
fi

if [[ $phys_arg =~ --ra-query-bank-file=.+ ]]; then
ra_query_bank_file=${phys_arg:22}
ra_query_bank_file=${phys_arg:21}
fi

if [[ $phys_arg =~ --ra-gap-s=.+ ]]; then
Expand All @@ -152,6 +152,10 @@ function extract_named_arguments() {
config_file=${phys_arg:14}
fi

if [[ $phys_arg =~ --planner-config-file=.+ ]]; then
planner_config_file=${phys_arg:22}
fi

if [[ $phys_arg =~ --skip-replan=.+ ]]; then
skip_replan=${phys_arg:14}
fi
Expand Down
6 changes: 4 additions & 2 deletions experiments/15-e2e-scenarios-v2/scale_down/COND
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ run_experiment(
run="./run_workload.sh",
options={
# TODO: Ideally, configurations are shared. Only keep AWS secrets separate.
"config-file": "config/config_20g.yml",
"config-file": "config/config_large.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_20g",
# TODO: Select regular query indexes
"ra-query-indexes": "0,1,2",
"ra-query-indexes": "0,1,25,26,50,51,75,76,27,28",
"ra-query-bank-file": IMDB_20GB_REGULAR_QUERY_BANK,
"num-front-ends": 4,
},
)
7 changes: 6 additions & 1 deletion src/brad/config/temp_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pathlib
import yaml
from typing import Any, Dict
from typing import Any, Dict, Optional


class TempConfig:
Expand All @@ -19,6 +19,11 @@ def __init__(self, raw: Dict[str, Any]) -> None:
def latency_ceiling_s(self) -> float:
return float(self._raw["latency_ceiling_s"])

def std_dataset_path(self) -> Optional[pathlib.Path]:
if "std_dataset_path" not in self._raw:
return None
return pathlib.Path(self._raw["std_dataset_path"])

def aurora_preds_path(self) -> pathlib.Path:
return pathlib.Path(self._raw["aurora_preds_path"])

Expand Down
35 changes: 24 additions & 11 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,30 @@ async def _run_setup(self) -> None:
if self._temp_config is not None:
# TODO: Actually call into the models. We avoid doing so for now to
# avoid having to implement model loading, etc.
latency_scorer: AnalyticsLatencyScorer = PrecomputedPredictions.load(
workload_file_path=self._temp_config.query_bank_path(),
aurora_predictions_path=self._temp_config.aurora_preds_path(),
redshift_predictions_path=self._temp_config.redshift_preds_path(),
athena_predictions_path=self._temp_config.athena_preds_path(),
)
data_access_provider = PrecomputedDataAccessProvider.load(
workload_file_path=self._temp_config.query_bank_path(),
aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(),
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)
std_dataset_path = self._temp_config.std_dataset_path()
if std_dataset_path is not None:
latency_scorer: AnalyticsLatencyScorer = (
PrecomputedPredictions.load_from_standard_dataset(
dataset_path=std_dataset_path,
)
)
data_access_provider = (
PrecomputedDataAccessProvider.load_from_standard_dataset(
dataset_path=std_dataset_path,
)
)
else:
latency_scorer = PrecomputedPredictions.load(
workload_file_path=self._temp_config.query_bank_path(),
aurora_predictions_path=self._temp_config.aurora_preds_path(),
redshift_predictions_path=self._temp_config.redshift_preds_path(),
athena_predictions_path=self._temp_config.athena_preds_path(),
)
data_access_provider = PrecomputedDataAccessProvider.load(
workload_file_path=self._temp_config.query_bank_path(),
aurora_accessed_pages_path=self._temp_config.aurora_data_access_path(),
athena_accessed_bytes_path=self._temp_config.athena_data_access_path(),
)
comparator = best_cost_under_p99_latency(
max_latency_ceiling_s=self._temp_config.latency_ceiling_s()
)
Expand Down
18 changes: 15 additions & 3 deletions src/brad/data_sync/operators/unload_to_s3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional

from .operator import Operator
from brad.data_sync.execution.context import ExecutionContext
Expand All @@ -8,7 +9,7 @@

_AURORA_UNLOAD_TEMPLATE = """
SELECT * FROM aws_s3.query_export_to_s3(
'SELECT * FROM {table_name}',
'{query}',
aws_commons.create_s3_uri(
'{s3_bucket}',
'{s3_path}',
Expand All @@ -34,7 +35,13 @@ class UnloadToS3(Operator):
Dumps data from a table onto S3.
"""

def __init__(self, table_name: str, relative_s3_path: str, engine: Engine) -> None:
def __init__(
self,
table_name: str,
relative_s3_path: str,
engine: Engine,
limit: Optional[int] = None,
) -> None:
"""
NOTE: All S3 paths are relative to the extract path, specified in the
configuration.
Expand All @@ -43,6 +50,7 @@ def __init__(self, table_name: str, relative_s3_path: str, engine: Engine) -> No
self._table_name = table_name
self._engine = engine
self._relative_s3_path = relative_s3_path
self._limit = limit

def __repr__(self) -> str:
return "".join(
Expand All @@ -67,8 +75,12 @@ async def execute(self, ctx: ExecutionContext) -> "Operator":
raise RuntimeError("Unsupported engine {}".format(self._engine))

async def _execute_aurora(self, ctx: ExecutionContext) -> "Operator":
inner_query = f"SELECT * FROM {self._table_name}"
if self._limit is not None:
inner_query += f" LIMIT {self._limit}"

query = _AURORA_UNLOAD_TEMPLATE.format(
table_name=self._table_name,
query=inner_query,
s3_bucket=ctx.s3_bucket(),
s3_region=ctx.s3_region(),
s3_path="{}{}".format(ctx.s3_path(), self._relative_s3_path),
Expand Down
32 changes: 32 additions & 0 deletions src/brad/planner/scoring/data_access/precomputed_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,38 @@ class PrecomputedDataAccessProvider(DataAccessProvider):
Used for debugging purposes.
"""

@classmethod
def load_from_standard_dataset(
cls,
dataset_path: str | pathlib.Path,
):
if isinstance(dataset_path, pathlib.Path):
dsp = dataset_path
else:
dsp = pathlib.Path(dataset_path)

with open(dsp / "queries.sql", "r", encoding="UTF-8") as query_file:
raw_queries = [line.strip() for line in query_file]

queries_map = {query: idx for idx, query in enumerate(raw_queries)}
queries_map = {}
for idx, query in enumerate(raw_queries):
if query.endswith(";"):
queries_map[query[:-1]] = idx
else:
queries_map[query] = idx

data_stats = np.load(dsp / "pred-data_accessed-athena-aurora.npy")
# TODO: Maybe we might want a better placeholder.
data_stats[np.isnan(data_stats)] = 0

aurora = data_stats[:, 1]
athena = data_stats[:, 0]
assert len(aurora.shape) == 1
assert len(athena.shape) == 1

return cls(queries_map, aurora, athena)

@classmethod
def load(
cls,
Expand Down
39 changes: 39 additions & 0 deletions src/brad/planner/scoring/performance/precomputed_predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,45 @@ class PrecomputedPredictions(AnalyticsLatencyScorer):
Used for debugging purposes.
"""

@classmethod
def load_from_standard_dataset(
cls, dataset_path: str | pathlib.Path
) -> "PrecomputedPredictions":
if isinstance(dataset_path, pathlib.Path):
dsp = dataset_path
else:
dsp = pathlib.Path(dataset_path)

with open(dsp / "queries.sql", "r", encoding="UTF-8") as query_file:
raw_queries = [line.strip() for line in query_file]

queries_map = {}
for idx, query in enumerate(raw_queries):
if query.endswith(";"):
queries_map[query[:-1]] = idx
else:
queries_map[query] = idx

rt_preds = np.load(dsp / "pred-run_time_s-athena-aurora-redshift.npy")

# Sanity check.
assert rt_preds.shape[0] == len(raw_queries)

preds = [np.array([]), np.array([]), np.array([])]
preds[Workload.EngineLatencyIndex[Engine.Aurora]] = rt_preds[:, 1]
preds[Workload.EngineLatencyIndex[Engine.Redshift]] = rt_preds[:, 2]
preds[Workload.EngineLatencyIndex[Engine.Athena]] = rt_preds[:, 0]

predictions = np.stack(preds, axis=-1)

# Replace any `inf` / `nan` values in the predictions with this value.
# This prevents a degenerate case in performance estimation.
timeout_value_s = 210.0
predictions[np.isinf(predictions)] = timeout_value_s
predictions[np.isnan(predictions)] = timeout_value_s

return cls(queries_map, predictions)

@classmethod
def load(
cls,
Expand Down
30 changes: 25 additions & 5 deletions tools/calibration/table_sizes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import argparse
import logging
import json
import yaml
from typing import Dict

from brad.asset_manager import AssetManager
from brad.config.engine import Engine
from brad.config.file import ConfigFile
from brad.data_sync.execution.context import ExecutionContext
from brad.data_sync.operators.unload_to_s3 import UnloadToS3
from brad.provisioning.directory import Directory
from brad.blueprint.manager import BlueprintManager
from brad.front_end.engine_connections import EngineConnections
from brad.utils.table_sizer import TableSizer
Expand All @@ -36,11 +38,22 @@ async def main_impl(args) -> None:
bp = mgr.get_blueprint()
logger.info("Using blueprint: %s", bp)

directory = Directory(config)
await directory.refresh()

engines_sync = EngineConnections.connect_sync(
config, args.schema_name, autocommit=True, specific_engines={Engine.Aurora}
config,
directory,
args.schema_name,
autocommit=True,
specific_engines={Engine.Aurora},
)
engines = await EngineConnections.connect(
config, args.schema_name, autocommit=True, specific_engines={Engine.Aurora}
config,
directory,
args.schema_name,
autocommit=True,
specific_engines={Engine.Aurora},
)

boto_client = boto3.client(
Expand Down Expand Up @@ -70,16 +83,17 @@ async def main_impl(args) -> None:
full_extract_path = f"{config.s3_extract_path}{extract_file}"

num_rows = table_sizer.table_size_rows(table.name, Engine.Aurora)
op = UnloadToS3(table.name, extract_file, Engine.Aurora)
extract_limit = min(num_rows, args.max_rows)
op = UnloadToS3(table.name, extract_file, Engine.Aurora, extract_limit)
await op.execute(ctx)

table_bytes = s3_object_size_bytes(
boto_client, config.s3_extract_bucket, full_extract_path
)

b_per_row = table_bytes / num_rows
b_per_row = table_bytes / extract_limit
logger.info(
"%s: %d rows, extracted %d B total", table.name, num_rows, table_bytes
"%s: %d rows, extracted %d B total", table.name, extract_limit, table_bytes
)
bytes_per_row[table.name] = b_per_row

Expand All @@ -93,6 +107,9 @@ async def main_impl(args) -> None:
for table_name, bpr in bytes_per_row.items():
print(f" {table_name}: {bpr}", flush=True)

with open(args.schema_name + "_data.yaml", "w", encoding="UTF-8") as file:
yaml.dump(bytes_per_row, file, default_flow_style=False)

await engines.close()
engines_sync.close_sync()

Expand All @@ -105,6 +122,9 @@ def main():
parser.add_argument("--config-file", type=str, required=True)
parser.add_argument("--schema-name", type=str, required=True)
parser.add_argument("--debug", action="store_true")
# Unloading is slow - we do not need to unload the entire table to get a
# reasonable idea of the size per row.
parser.add_argument("--max-rows", type=int, default=500_000)
args = parser.parse_args()

set_up_logging(debug_mode=args.debug)
Expand Down
Loading

0 comments on commit ff2924e

Please sign in to comment.