From 8fa3ec12c4d6528cce6a5349a79aabb0897690dd Mon Sep 17 00:00:00 2001 From: wuziniu Date: Thu, 21 Mar 2024 15:25:13 -0400 Subject: [PATCH] adding brad runner with variable txn clients (run_transactions_variable_clients.py) --- .../run_transactions_variable_clients.py | 762 ++++++++++++++ .../IMDB_extended/run_variable_clients.py | 927 ++++++++---------- 2 files changed, 1181 insertions(+), 508 deletions(-) create mode 100644 workloads/IMDB_extended/run_transactions_variable_clients.py diff --git a/workloads/IMDB_extended/run_transactions_variable_clients.py b/workloads/IMDB_extended/run_transactions_variable_clients.py new file mode 100644 index 00000000..5f4a3fda --- /dev/null +++ b/workloads/IMDB_extended/run_transactions_variable_clients.py @@ -0,0 +1,762 @@ +import asyncio +import argparse +import pathlib +import pickle +import random +import signal +import threading +import time +import numpy as np +import os +import pytz +import multiprocessing as mp +import logging +from datetime import datetime, timedelta +from typing import Optional, List + +from brad.config.engine import Engine +from brad.config.file import ConfigFile +from brad.grpc_client import BradClientError +from brad.provisioning.directory import Directory +from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff +from brad.utils.time_periods import universal_now +from brad.utils import set_up_logging, create_custom_logger +from workload_utils.connect import connect_to_db +from workload_utils.transaction_worker import TransactionWorker + +logger = logging.getLogger(__name__) +STARTUP_FAILED = "startup_failed" + + +def runner( + args, + worker_idx: int, + directory: Optional[Directory], + start_queue: mp.Queue, + control_semaphore: mp.Semaphore, # type: ignore + pause_semaphore: mp.Semaphore, # type: ignore + resume_semaphore: mp.Semaphore, # type: ignore +) -> None: + """ + Meant to be launched as a subprocess with multiprocessing. + """ + + def noop_handler(_signal, _frame): + pass + + signal.signal(signal.SIGINT, noop_handler) + + set_up_logging() + + worker = TransactionWorker( + worker_idx, + args.seed ^ worker_idx, + args.scale_factor, + args.dataset_type, + args.use_zipfian_ids, + args.zipfian_alpha, + ) + + txn_prng = random.Random(~(args.seed ^ worker_idx)) + transactions = [ + worker.purchase_tickets, + worker.add_new_showing, + worker.edit_movie_note, + ] + transaction_weights = [ + 0.70, + 0.20, + 0.10, + ] + lookup_theatre_id_by_name = 0.8 + txn_indexes = list(range(len(transactions))) + commits = [0 for _ in range(len(transactions))] + aborts = [0 for _ in range(len(transactions))] + + # Connect and set the isolation level. + try: + db = connect_to_db( + args, worker_idx, direct_engine=Engine.Aurora, directory=directory + ) + db.execute_sync( + f"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL {args.isolation_level}" + ) + except BradClientError as ex: + logger.error("[T %d] Failed to connect to BRAD: %s", worker_idx, str(ex)) + start_queue.put_nowait(STARTUP_FAILED) + return + + # For printing out results. + if "COND_OUT" in os.environ: + # pylint: disable-next=import-error + import conductor.lib as cond + + out_dir = cond.get_output_path() + else: + out_dir = pathlib.Path(".") + + verbose_log_dir = out_dir / "verbose_logs" + verbose_log_dir.mkdir(exist_ok=True) + verbose_logger = create_custom_logger( + "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_idx}.log") + ) + verbose_logger.info("Workload starting...") + + # Signal that we are ready to start and wait for other clients. + start_queue.put("") + control_semaphore.acquire() # type: ignore + + txn_exec_count = 0 + rand_backoff = None + overall_start = time.time() + try: + latency_file = open( + out_dir / "oltp_latency_{}.csv".format(worker_idx), "w", encoding="UTF-8" + ) + print("txn_idx,timestamp,run_time_s", file=latency_file) + + pause = False + while True: + # Note that `False` means to not block. + should_exit = control_semaphore.acquire(False) # type: ignore + if should_exit: + logger.info("T Runner %d is exiting.", worker_idx) + break + should_pause = pause_semaphore.acquire(False) # type: ignore + if should_pause: + pause = True + if pause: + should_resume = resume_semaphore.acquire(False) # type: ignore + if should_resume: + pause = False + else: + time.sleep(1) + continue + + txn_exec_count += 1 + txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0] + txn = transactions[txn_idx] + + now = datetime.now().astimezone(pytz.utc) + txn_start = time.time() + try: + # pylint: disable-next=comparison-with-callable + if txn == worker.purchase_tickets: + succeeded = txn( + db, + select_using_name=txn_prng.random() < lookup_theatre_id_by_name, + ) + else: + succeeded = txn(db) + + if rand_backoff is not None: + logger.info("[T %d] Continued after transient errors.", worker_idx) + rand_backoff = None + + except BradClientError as ex: + succeeded = False + if ex.is_transient(): + verbose_logger.warning("Transient txn error: %s", ex.message()) + + if rand_backoff is None: + rand_backoff = RandomizedExponentialBackoff( + max_retries=100, + base_delay_s=0.1, + max_delay_s=timedelta(minutes=1).total_seconds(), + ) + logger.info( + "[T %d] Backing off due to transient errors.", + worker_idx, + ) + + # Delay retrying in the case of a transient error (this + # happens during blueprint transitions). + wait_s = rand_backoff.wait_time_s() + if wait_s is None: + logger.info( + "[T %d] Aborting benchmark. Too many transient errors.", + worker_idx, + ) + break + verbose_logger.info( + "[T %d] Backing off for %.4f seconds...", worker_idx, wait_s + ) + time.sleep(wait_s) + + else: + logger.error( + "[T %d] Encountered an unexpected `BradClientError`. Aborting the workload...", + worker_idx, + ) + raise + except: + succeeded = False + logger.error( + "[T %d] Encountered an unexpected error. Aborting the workload...", + worker_idx, + ) + raise + txn_end = time.time() + + # Record metrics. + if succeeded: + commits[txn_idx] += 1 + else: + aborts[txn_idx] += 1 + + if txn_prng.random() < args.latency_sample_prob: + print( + "{},{},{}".format(txn_idx, now, txn_end - txn_start), + file=latency_file, + ) + if txn_exec_count > 20_000: + latency_file.flush() + txn_exec_count = 0 + + # Warn if the abort rate is high. + total_aborts = sum(aborts) + total_commits = sum(commits) + abort_rate = total_aborts / (total_aborts + total_commits) + if abort_rate > 0.15: + logger.info( + "[T %d] Abort rate is higher than expected ({%4f}).", + worker_idx, + abort_rate, + ) + + finally: + overall_end = time.time() + logger.info("[T %d] Done running transactions.", worker_idx) + latency_file.close() + + with open( + out_dir / "oltp_stats_{}.csv".format(worker_idx), "w", encoding="UTF-8" + ) as file: + print("stat,value", file=file) + print(f"overall_run_time_s,{overall_end - overall_start}", file=file) + print(f"purchase_commits,{commits[0]}", file=file) + print(f"add_showing_commits,{commits[1]}", file=file) + print(f"edit_note_commits,{commits[2]}", file=file) + print(f"purchase_aborts,{aborts[0]}", file=file) + print(f"add_showing_aborts,{aborts[1]}", file=file) + print(f"edit_note_aborts,{aborts[2]}", file=file) + + db.close_sync() + + +def simulation_runner( + args, + worker_idx: int, + directory: Optional[Directory], + start_queue: mp.Queue, + control_semaphore: mp.Semaphore, # type: ignore + pause_semaphore: mp.Semaphore, # type: ignore + resume_semaphore: mp.Semaphore, # type: ignore +) -> None: + """ + Meant to be launched as a subprocess with multiprocessing. + """ + + def noop_handler(_signal, _frame): + pass + + signal.signal(signal.SIGINT, noop_handler) + + set_up_logging() + + worker = TransactionWorker( + worker_idx, + args.seed ^ worker_idx, + args.scale_factor, + args.dataset_type, + args.use_zipfian_ids, + args.zipfian_alpha, + ) + + txn_prng = random.Random(~(args.seed ^ worker_idx)) + transactions = [ + worker.purchase_tickets, + worker.add_new_showing, + worker.edit_movie_note, + ] + transaction_weights = [ + 0.70, + 0.20, + 0.10, + ] + lookup_theatre_id_by_name = 0.8 + txn_indexes = list(range(len(transactions))) + commits = [0 for _ in range(len(transactions))] + aborts = [0 for _ in range(len(transactions))] + + # For printing out results. + if "COND_OUT" in os.environ: + # pylint: disable-next=import-error + import conductor.lib as cond + + out_dir = cond.get_output_path() + else: + out_dir = pathlib.Path(".") + + verbose_log_dir = out_dir / "verbose_logs" + verbose_log_dir.mkdir(exist_ok=True) + verbose_logger = create_custom_logger( + "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_idx}.log") + ) + verbose_logger.info("Workload starting...") + + # Signal that we are ready to start and wait for other clients. + start_queue.put("") + control_semaphore.acquire() # type: ignore + + txn_exec_count = 0 + rand_backoff = None + overall_start = time.time() + + latency_file = open( + out_dir / "oltp_latency_{}.csv".format(worker_idx), "w", encoding="UTF-8" + ) + print("txn_idx,timestamp,run_time_s", file=latency_file) + + pause = False + while True: + # Note that `False` means to not block. + should_exit = control_semaphore.acquire(False) # type: ignore + if should_exit: + logger.info("T Runner %d is exiting.", worker_idx) + break + should_pause = pause_semaphore.acquire(False) # type: ignore + if should_pause: + pause = True + if pause: + should_resume = resume_semaphore.acquire(False) # type: ignore + if should_resume: + pause = False + else: + time.sleep(1) + continue + + txn_exec_count += 1 + txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0] + + now = datetime.now().astimezone(pytz.utc) + + succeeded = np.random.choice([True, False], p=[0.8, 0.2]) + + if rand_backoff is not None: + logger.info("[T %d] Continued after transient errors.", worker_idx) + rand_backoff = None + + time.sleep(0.01) + + # Record metrics. + if succeeded: + commits[txn_idx] += 1 + else: + aborts[txn_idx] += 1 + + if txn_prng.random() < args.latency_sample_prob: + print( + "{},{},{}".format(txn_idx, now, 0.01), + file=latency_file, + ) + if txn_exec_count > 20_000: + latency_file.flush() + txn_exec_count = 0 + + # Warn if the abort rate is high. + total_aborts = sum(aborts) + total_commits = sum(commits) + abort_rate = total_aborts / (total_aborts + total_commits) + if abort_rate > 0.15: + logger.info( + "[T %d] Abort rate is higher than expected ({%4f}).", + worker_idx, + abort_rate, + ) + + logger.info("[T %d] Done running transactions.", worker_idx) + latency_file.close() + + +class PauseController: + # controlling the pause and resume of clients + def __init__( + self, + total_num_clients: int, + pause_semaphore: List[mp.Semaphore], # type: ignore + resume_semaphore: List[mp.Semaphore], # type: ignore + ): + self.total_num_clients = total_num_clients + self.pause_semaphore = pause_semaphore + self.resume_semaphore = resume_semaphore + self.paused_clients: List[int] = [] + self.running_clients: List[int] = list(range(total_num_clients)) + self.num_running_clients: int = total_num_clients + + def adjust_num_running_clients( + self, num_clients: int, verbose: bool = True + ) -> None: + if num_clients > self.total_num_clients: + print( + f"invalid input number of clients {num_clients}, larger than total number of clients" + ) + return + if num_clients == self.num_running_clients: + return + elif num_clients < self.num_running_clients: + pause_clients = np.random.choice( + self.running_clients, + size=self.num_running_clients - num_clients, + replace=False, + ) + for i in pause_clients: + assert ( + i not in self.paused_clients + ), f"trying to pause a client that is already paused: {i}" + if verbose: + print(f"pausing client {i}") + self.pause_semaphore[i].release() + self.pause_semaphore[i].release() + self.paused_clients.append(i) + self.running_clients.remove(i) + self.num_running_clients -= 1 + else: + resume_clients = np.random.choice( + self.paused_clients, + size=num_clients - self.num_running_clients, + replace=False, + ) + for i in resume_clients: + assert ( + i not in self.running_clients + ), f"trying to resume a running client: {i}" + if verbose: + print(f"resuming client {i}") + self.resume_semaphore[i].release() + self.resume_semaphore[i].release() + self.paused_clients.remove(i) + self.running_clients.append(i) + self.num_running_clients += 1 + + +async def get_command_line_input(pause_controller: PauseController) -> None: + while True: + try: + user_input = input() + if user_input.isnumeric(): + num_client = int(user_input) + pause_controller.adjust_num_running_clients(num_client) + elif user_input == "exit": + break + except KeyboardInterrupt: + break + + +def main(): + parser = argparse.ArgumentParser( + "Tool used to run IMDB-extended transactions against BRAD or an ODBC database." + ) + parser.add_argument( + "--run-for-s", + type=int, + help="How long to run the workload for. If unset, the experiment will run until Ctrl-C.", + ) + parser.add_argument( + "--run-simulation", + action="store_true", + help="Run the simulation instead of actual execution.", + ) + parser.add_argument( + "--num-clients", + type=int, + default=1, + help="The number of transactional clients.", + ) + parser.add_argument("--client-offset", type=int, default=0) + parser.add_argument( + "--seed", type=int, default=42, help="Random seed for reproducibility." + ) + parser.add_argument( + "--cstr-var", + type=str, + help="Environment variable that holds a ODBC connection string. Set to connect directly (i.e., not through BRAD)", + ) + parser.add_argument( + "--scale-factor", + type=int, + default=1, + help="The scale factor used to generate the dataset.", + ) + parser.add_argument( + "--isolation-level", + type=str, + default="REPEATABLE READ", + help="The isolation level to use when running the transactions.", + ) + parser.add_argument( + "--brad-direct", + action="store_true", + help="Set to connect directly to Aurora via BRAD's config.", + ) + parser.add_argument( + "--config-file", + type=str, + help="The BRAD config file (if --brad-direct is used).", + ) + parser.add_argument( + "--schema-name", + type=str, + help="The schema name to use, if connecting directly.", + ) + parser.add_argument( + "--latency-sample-prob", + type=float, + default=0.01, + help="The probability that a transaction's latency will be recorded.", + ) + parser.add_argument( + "--dataset-type", + choices=["original", "20gb", "100gb"], + default="original", + help="This controls the range of reads the transaction worker performs, " + "depending on the dataset size.", + ) + parser.add_argument( + "--use-zipfian-ids", + action="store_true", + help="Whether the transaction worker should draw movie and theatre IDs " + "from a Zipfian distribution.", + ) + parser.add_argument( + "--zipfian-alpha", + type=float, + default=1.1, + help="The alpha parameter for the Zipfian distribution. Only used if " + "--use-zipfian-ids is `True`. Must be strictly greater than 1. ", + ) + # These three arguments are used for the day long experiment. + parser.add_argument( + "--num-client-path", + type=str, + default=None, + help="Path to the distribution of number of clients for each period of a day", + ) + parser.add_argument( + "--num-client-multiplier", + type=int, + default=1, + help="The multiplier to the number of clients for each period of a day", + ) + parser.add_argument( + "--time-scale-factor", + type=int, + default=100, + help="trace 1s of simulation as X seconds in real-time to match the num-concurrent-query", + ) + parser.add_argument("--brad-host", type=str, default="localhost") + parser.add_argument("--brad-port", type=int, default=6583) + parser.add_argument("--num-front-ends", type=int, default=1) + args = parser.parse_args() + + set_up_logging() + + if ( + args.num_client_path is not None + and os.path.exists(args.num_client_path) + and args.time_scale_factor is not None + ): + # we can only set the num_concurrent_query trace in presence of time_scale_factor + with open(args.num_client_path, "rb") as f: + num_client_trace = pickle.load(f) + logger.info("[T] Preparing to run a time varying workload") + else: + num_client_trace = None + logger.info("[T] Preparing to run a steady workload") + + mgr = mp.Manager() + start_queue = [mgr.Queue() for _ in range(args.num_clients)] + # pylint: disable-next=no-member + control_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] + # pylint: disable-next=no-member + pause_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] + # pylint: disable-next=no-member + resume_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] + + if args.brad_direct: + assert args.config_file is not None + assert args.schema_name is not None + config = ConfigFile.load(args.config_file) + directory = Directory(config) + asyncio.run(directory.refresh()) + else: + directory = None + + clients = [] + if args.run_simulation: + for idx in range(args.num_clients): + p = mp.Process( + target=simulation_runner, + args=( + args, + idx, + directory, + start_queue[idx], + control_semaphore[idx], + pause_semaphore[idx], + resume_semaphore[idx], + ), + ) + p.start() + clients.append(p) + else: + for idx in range(args.num_clients): + p = mp.Process( + target=runner, + args=( + args, + idx, + directory, + start_queue[idx], + control_semaphore[idx], + pause_semaphore[idx], + resume_semaphore[idx], + ), + ) + p.start() + clients.append(p) + + logger.info("[T] Waiting for startup...") + one_startup_failed = False + for i in range(args.num_clients): + msg = start_queue[i].get() + if msg == STARTUP_FAILED: + one_startup_failed = True + + if one_startup_failed: + logger.error( + "At least one transactional runner failed to start up. Aborting the experiment.", + ) + for i in range(args.num_clients): + control_semaphore[i].release() + control_semaphore[i].release() + for p in clients: + p.join() + logger.info("Transactional client abort complete.") + return + + if num_client_trace is not None: + logger.info("[T] Scaling number of clients by %d", args.num_client_multiplier) + for k in num_client_trace.keys(): + num_client_trace[k] *= args.num_client_multiplier + + assert args.time_scale_factor is not None, "Need to set --time-scale-factor" + assert args.run_for_s is not None, "Need to set --run-for-s" + + execute_start_time = universal_now() + num_running_client = 0 + num_client_required = min(num_client_trace[0], args.num_clients) + for add_client in range(num_running_client, num_client_required): + logger.info("[T] Telling client no. %d to start.", add_client) + control_semaphore[add_client].release() + num_running_client += 1 + + finished_one_day = True + curr_day_start_time = datetime.now().astimezone(pytz.utc) + for time_of_day, num_expected_clients in num_client_trace.items(): + if time_of_day == 0: + continue + # at this time_of_day start/shut-down more clients + time_in_s = time_of_day / args.time_scale_factor + now = datetime.now().astimezone(pytz.utc) + curr_time_in_s = (now - curr_day_start_time).total_seconds() + total_exec_time_in_s = (now - execute_start_time).total_seconds() + if args.run_for_s <= total_exec_time_in_s: + finished_one_day = False + break + if args.run_for_s - total_exec_time_in_s <= (time_in_s - curr_time_in_s): + wait_time = args.run_for_s - total_exec_time_in_s + if wait_time > 0: + time.sleep(wait_time) + finished_one_day = False + break + time.sleep(time_in_s - curr_time_in_s) + num_client_required = min(num_expected_clients, args.num_clients) + if num_client_required > num_running_client: + # starting additional clients + for add_client in range(num_running_client, num_client_required): + logger.info("[T] Telling client no. %d to start.", add_client) + control_semaphore[add_client].release() + num_running_client += 1 + elif num_running_client > num_client_required: + # shutting down clients + for delete_client in range(num_running_client, num_client_required, -1): + logger.info( + "[T] Telling client no. %d to stop.", (delete_client - 1) + ) + control_semaphore[delete_client - 1].release() + num_running_client -= 1 + now = datetime.now().astimezone(pytz.utc) + total_exec_time_in_s = (now - execute_start_time).total_seconds() + if finished_one_day: + logger.info( + "[T] Finished executing one day of workload in %d s, will ignore the rest of " + "pre-set execution time %d s", + total_exec_time_in_s, + args.run_for_s, + ) + else: + logger.info( + "[T] Executed ended but unable to finish executing the trace of a full day within %d s", + args.run_for_s, + ) + + else: + logger.info("[T] Telling all %d clients to start.", args.num_clients) + for idx in range(args.num_clients): + control_semaphore[idx].release() + + pause_controller = PauseController( + args.num_clients, pause_semaphore, resume_semaphore + ) + + if args.run_for_s is not None and num_client_trace is None: + logger.info("[T] Letting the experiment run for %d seconds...", args.run_for_s) + time.sleep(args.run_for_s) + + elif num_client_trace is None: + logger.info("[T] Waiting until requested to stop... (hit Ctrl-C)") + logger.info( + "type in an integer smaller than total number of clients and press enter to change number of running client, type in exit to stop dynamically adjusting number of clients...", + ) + asyncio.run(get_command_line_input(pause_controller)) + should_shutdown = threading.Event() + + def signal_handler(_signal, _frame): + should_shutdown.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + should_shutdown.wait() + + logger.info("[T] Stopping clients...") + for idx in range(args.num_clients): + # Note that in most cases, one release will have already run. This is OK + # because downstream runners will not hang if there is a unconsumed + # semaphore value. + control_semaphore[idx].release() + control_semaphore[idx].release() + + logger.info("[T] Waiting for clients to terminate...") + for c in clients: + c.join() + logger.info("[T] Done transactions!") + + +if __name__ == "__main__": + # On Unix platforms, the default way to start a process is by forking, which + # is not ideal (we do not want to duplicate this process' file + # descriptors!). + mp.set_start_method("spawn") + main() diff --git a/workloads/IMDB_extended/run_variable_clients.py b/workloads/IMDB_extended/run_variable_clients.py index 72da511c..d1497127 100644 --- a/workloads/IMDB_extended/run_variable_clients.py +++ b/workloads/IMDB_extended/run_variable_clients.py @@ -1,76 +1,91 @@ +import asyncio import argparse -import copy -import multiprocessing as mp -import time -import os -import pickle -import numpy as np import pathlib +import pickle import random -import sys -import threading import signal +import threading +import time +import numpy as np +import os import pytz +import multiprocessing as mp import logging -import asyncio -from typing import List, Optional -import numpy.typing as npt from datetime import datetime, timedelta +from typing import Optional, List -from workload_utils.connect import connect_to_db from brad.config.engine import Engine +from brad.config.file import ConfigFile from brad.grpc_client import BradClientError +from brad.provisioning.directory import Directory from brad.utils.rand_exponential_backoff import RandomizedExponentialBackoff +from brad.utils.time_periods import universal_now from brad.utils import set_up_logging, create_custom_logger +from workload_utils.connect import connect_to_db +from workload_utils.transaction_worker import TransactionWorker logger = logging.getLogger(__name__) -EXECUTE_START_TIME = datetime.now().astimezone(pytz.utc) -ENGINE_NAMES = ["ATHENA", "AURORA", "REDSHIFT"] - STARTUP_FAILED = "startup_failed" -def get_time_of_the_day_unsimulated( - now: datetime, time_scale_factor: Optional[int] -) -> int: - # Get the time of the day in minute in real-time - assert time_scale_factor is not None, "need to specify args.time_scale_factor" - # time_diff in minutes after scaling - time_diff = int((now - EXECUTE_START_TIME).total_seconds() / 60 * time_scale_factor) - time_unsimulated = time_diff % (24 * 60) # time of the day in minutes - return time_unsimulated - - -def time_in_minute_to_datetime_str(time_unsimulated: Optional[int]) -> str: - if time_unsimulated is None: - return "xxx" - hour = time_unsimulated // 60 - assert hour < 24 - minute = time_unsimulated % 60 - hour_str = str(hour) if hour >= 10 else "0" + str(hour) - minute_str = str(minute) if minute >= 10 else "0" + str(minute) - return f"{hour_str}:{minute_str}" - - def runner( - runner_idx: int, + args, + worker_idx: int, + directory: Optional[Directory], start_queue: mp.Queue, control_semaphore: mp.Semaphore, # type: ignore pause_semaphore: mp.Semaphore, # type: ignore resume_semaphore: mp.Semaphore, # type: ignore - args, - query_bank: List[str], - queries: List[int], - query_frequency: Optional[npt.NDArray] = None, - execution_gap_dist: Optional[npt.NDArray] = None, ) -> None: - def noop(_signal, _frame): + """ + Meant to be launched as a subprocess with multiprocessing. + """ + + def noop_handler(_signal, _frame): pass - signal.signal(signal.SIGINT, noop) + signal.signal(signal.SIGINT, noop_handler) set_up_logging() + worker = TransactionWorker( + worker_idx, + args.seed ^ worker_idx, + args.scale_factor, + args.dataset_type, + args.use_zipfian_ids, + args.zipfian_alpha, + ) + + txn_prng = random.Random(~(args.seed ^ worker_idx)) + transactions = [ + worker.purchase_tickets, + worker.add_new_showing, + worker.edit_movie_note, + ] + transaction_weights = [ + 0.70, + 0.20, + 0.10, + ] + lookup_theatre_id_by_name = 0.8 + txn_indexes = list(range(len(transactions))) + commits = [0 for _ in range(len(transactions))] + aborts = [0 for _ in range(len(transactions))] + + # Connect and set the isolation level. + try: + db = connect_to_db( + args, worker_idx, direct_engine=Engine.Aurora, directory=directory + ) + db.execute_sync( + f"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL {args.isolation_level}" + ) + except BradClientError as ex: + logger.error("[T %d] Failed to connect to BRAD: %s", worker_idx, str(ex)) + start_queue.put_nowait(STARTUP_FAILED) + return + # For printing out results. if "COND_OUT" in os.environ: # pylint: disable-next=import-error @@ -83,74 +98,30 @@ def noop(_signal, _frame): verbose_log_dir = out_dir / "verbose_logs" verbose_log_dir.mkdir(exist_ok=True) verbose_logger = create_custom_logger( - "repeating_analytics_verbose", str(verbose_log_dir / f"runner_{runner_idx}.log") + "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_idx}.log") ) verbose_logger.info("Workload starting...") - if args.engine is not None: - engine = Engine.from_str(args.engine) - else: - engine = None - - try: - database = connect_to_db( - args, - runner_idx, - direct_engine=engine, - # Ensure we disable the result cache if we are running directly on - # Redshift. - disable_direct_redshift_result_cache=True, - ) - except BradClientError as ex: - logger.error("[RA %d] Failed to connect to BRAD: %s", runner_idx, str(ex)) - start_queue.put_nowait(STARTUP_FAILED) - return - - if query_frequency is not None: - # There are no predictions for query 48 in our test set (query cannot be parsed). - # Set its frequency to 0 so it is never used. - query_frequency[48] = 0.0 - - query_frequency = query_frequency[queries] - query_frequency = query_frequency / np.sum(query_frequency) - - exec_count = 0 - file = open( - out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), - "w", - encoding="UTF-8", - ) + # Signal that we are ready to start and wait for other clients. + start_queue.put("") + control_semaphore.acquire() # type: ignore + txn_exec_count = 0 + rand_backoff = None + overall_start = time.time() try: - print( - "timestamp,time_since_execution_s,time_of_day,query_idx,run_time_s,engine", - file=file, - flush=True, + latency_file = open( + out_dir / "oltp_latency_{}.csv".format(worker_idx), "w", encoding="UTF-8" ) - - prng = random.Random(args.seed ^ runner_idx) - rand_backoff = None - - logger.info( - "[Repeating Analytics Runner %d] Queries to run: %s", - runner_idx, - queries, - ) - query_order_main = queries.copy() - prng.shuffle(query_order_main) - query_order = query_order_main.copy() - - first_run = True - - # Signal that we're ready to start and wait for the controller. - logger.info("[RA] Runner %d is ready to start running.", runner_idx) - start_queue.put_nowait("") - control_semaphore.acquire() # type: ignore + print("txn_idx,timestamp,run_time_s", file=latency_file) pause = False while True: # Note that `False` means to not block. should_exit = control_semaphore.acquire(False) # type: ignore + if should_exit: + logger.info("T Runner %d is exiting.", worker_idx) + break should_pause = pause_semaphore.acquire(False) # type: ignore if should_pause: pause = True @@ -161,139 +132,161 @@ def noop(_signal, _frame): else: time.sleep(1) continue - if should_exit: - logger.info("[RA] Runner %d is exiting.", runner_idx) - break - - if execution_gap_dist is not None: - now = datetime.now().astimezone(pytz.utc) - time_unsimulated = get_time_of_the_day_unsimulated( - now, args.time_scale_factor - ) - wait_for_s = execution_gap_dist[ - int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) - ] - time.sleep(wait_for_s) - elif args.avg_gap_s is not None: - if first_run: - # We wait a uniformly random amount of time at the beginning - # to stagger queries across all the clients that will run - # (e.g., to avoid having all clients issue queries at the - # same time). - first_run = False - wait_for_s = prng.uniform(0.0, args.avg_gap_s) - time.sleep(wait_for_s) - else: - # Wait times are normally distributed if execution_gap_dist is not provided. - wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) - if wait_for_s < 0.0: - wait_for_s = 0.0 - time.sleep(wait_for_s) - - if query_frequency is not None: - qidx = prng.choices(queries, list(query_frequency))[0] - else: - if len(query_order) == 0: - query_order = query_order_main.copy() - qidx = query_order.pop() - logger.debug("Executing qidx: %d", qidx) - query = query_bank[qidx] + txn_exec_count += 1 + txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0] + txn = transactions[txn_idx] + now = datetime.now().astimezone(pytz.utc) + txn_start = time.time() try: - engine = None - now = datetime.now().astimezone(pytz.utc) - if args.time_scale_factor is not None: - time_unsimulated = get_time_of_the_day_unsimulated( - now, args.time_scale_factor - ) - time_unsimulated_str = time_in_minute_to_datetime_str( - time_unsimulated + # pylint: disable-next=comparison-with-callable + if txn == worker.purchase_tickets: + succeeded = txn( + db, + select_using_name=txn_prng.random() < lookup_theatre_id_by_name, ) else: - time_unsimulated_str = "xxx" - - verbose_logger.info("[RA %d] Issuing query %d", runner_idx, qidx) - start = time.time() - _, engine = database.execute_sync_with_engine(query) - end = time.time() - print( - "{},{},{},{},{},{}".format( - now, - (now - EXECUTE_START_TIME).total_seconds(), - time_unsimulated_str, - qidx, - end - start, - engine.value, - ), - file=file, - flush=True, - ) - - if exec_count % 20 == 0: - # To avoid data loss if this script crashes. - os.fsync(file.fileno()) + succeeded = txn(db) - exec_count += 1 if rand_backoff is not None: - logger.info("[RA %d] Continued after transient errors.", runner_idx) + logger.info("[T %d] Continued after transient errors.", worker_idx) rand_backoff = None except BradClientError as ex: + succeeded = False if ex.is_transient(): - verbose_logger.warning("Transient query error: %s", ex.message()) + verbose_logger.warning("Transient txn error: %s", ex.message()) if rand_backoff is None: rand_backoff = RandomizedExponentialBackoff( max_retries=100, - base_delay_s=1.0, + base_delay_s=0.1, max_delay_s=timedelta(minutes=1).total_seconds(), ) logger.info( - "[RA %d] Backing off due to transient errors.", runner_idx + "[T %d] Backing off due to transient errors.", + worker_idx, ) # Delay retrying in the case of a transient error (this # happens during blueprint transitions). wait_s = rand_backoff.wait_time_s() if wait_s is None: - logger.error( - "[RA %d] Aborting benchmark. Too many transient errors.", - runner_idx, + logger.info( + "[T %d] Aborting benchmark. Too many transient errors.", + worker_idx, ) break verbose_logger.info( - "[RA %d] Backing off for %.4f seconds...", runner_idx, wait_s + "[T %d] Backing off for %.4f seconds...", worker_idx, wait_s ) time.sleep(wait_s) else: - logger.error("Unexpected query error: %s", ex.message()) + logger.error( + "[T %d] Encountered an unexpected `BradClientError`. Aborting the workload...", + worker_idx, + ) + raise + except: + succeeded = False + logger.error( + "[T %d] Encountered an unexpected error. Aborting the workload...", + worker_idx, + ) + raise + txn_end = time.time() + + # Record metrics. + if succeeded: + commits[txn_idx] += 1 + else: + aborts[txn_idx] += 1 + + if txn_prng.random() < args.latency_sample_prob: + print( + "{},{},{}".format(txn_idx, now, txn_end - txn_start), + file=latency_file, + ) + if txn_exec_count > 20_000: + latency_file.flush() + txn_exec_count = 0 + + # Warn if the abort rate is high. + total_aborts = sum(aborts) + total_commits = sum(commits) + abort_rate = total_aborts / (total_aborts + total_commits) + if abort_rate > 0.15: + logger.info( + "[T %d] Abort rate is higher than expected ({%4f}).", + worker_idx, + abort_rate, + ) finally: - os.fsync(file.fileno()) - file.close() - database.close_sync() - logger.info("[RA] Runner %d has exited.", runner_idx) + overall_end = time.time() + logger.info("[T %d] Done running transactions.", worker_idx) + latency_file.close() + + with open( + out_dir / "oltp_stats_{}.csv".format(worker_idx), "w", encoding="UTF-8" + ) as file: + print("stat,value", file=file) + print(f"overall_run_time_s,{overall_end - overall_start}", file=file) + print(f"purchase_commits,{commits[0]}", file=file) + print(f"add_showing_commits,{commits[1]}", file=file) + print(f"edit_note_commits,{commits[2]}", file=file) + print(f"purchase_aborts,{aborts[0]}", file=file) + print(f"add_showing_aborts,{aborts[1]}", file=file) + print(f"edit_note_aborts,{aborts[2]}", file=file) + + db.close_sync() def simulation_runner( - all_query_runtime: npt.NDArray, - runner_idx: int, + args, + worker_idx: int, start_queue: mp.Queue, control_semaphore: mp.Semaphore, # type: ignore pause_semaphore: mp.Semaphore, # type: ignore resume_semaphore: mp.Semaphore, # type: ignore - args, - queries: List[int], - query_frequency_original: Optional[npt.NDArray] = None, - execution_gap_dist: Optional[npt.NDArray] = None, - wait_for_execute: bool = False, ) -> None: - def noop(_signal, _frame): + """ + Meant to be launched as a subprocess with multiprocessing. + """ + + def noop_handler(_signal, _frame): pass - signal.signal(signal.SIGINT, noop) + signal.signal(signal.SIGINT, noop_handler) + + set_up_logging() + + worker = TransactionWorker( + worker_idx, + args.seed ^ worker_idx, + args.scale_factor, + args.dataset_type, + args.use_zipfian_ids, + args.zipfian_alpha, + ) + + txn_prng = random.Random(~(args.seed ^ worker_idx)) + transactions = [ + worker.purchase_tickets, + worker.add_new_showing, + worker.edit_movie_note, + ] + transaction_weights = [ + 0.70, + 0.20, + 0.10, + ] + lookup_theatre_id_by_name = 0.8 + txn_indexes = list(range(len(transactions))) + commits = [0 for _ in range(len(transactions))] + aborts = [0 for _ in range(len(transactions))] # For printing out results. if "COND_OUT" in os.environ: @@ -304,109 +297,84 @@ def noop(_signal, _frame): else: out_dir = pathlib.Path(".") - if query_frequency_original is not None: - query_frequency = copy.deepcopy(query_frequency_original) - # There are no predictions for query 48 in our test set (query cannot be parsed). - # Set its frequency to 0 so it is never used. - query_frequency[48] = 0.0 - query_frequency = query_frequency[queries] - query_frequency = query_frequency / np.sum(query_frequency) - else: - query_frequency = None - - with open( - out_dir / "repeating_olap_batch_{}.csv".format(runner_idx), - "w", - encoding="UTF-8", - ) as file: - print( - "timestamp,time_since_execution,time_of_day,query_idx,run_time_s,engine", - file=file, - flush=True, - ) + verbose_log_dir = out_dir / "verbose_logs" + verbose_log_dir.mkdir(exist_ok=True) + verbose_logger = create_custom_logger( + "txn_runner_verbose", str(verbose_log_dir / f"runner_{worker_idx}.log") + ) + verbose_logger.info("Workload starting...") - prng = random.Random(args.seed ^ runner_idx) + # Signal that we are ready to start and wait for other clients. + start_queue.put("") + control_semaphore.acquire() # type: ignore - logger.info( - "[Repeating Analytics Runner %d] Queries to run: %s", - runner_idx, - queries, - ) - query_order = queries.copy() - prng.shuffle(query_order) + txn_exec_count = 0 + rand_backoff = None - # Signal that we're ready to start and wait for the controller. - start_queue.put_nowait("") - control_semaphore.acquire() # type: ignore + latency_file = open( + out_dir / "oltp_latency_{}.csv".format(worker_idx), "w", encoding="UTF-8" + ) + print("txn_idx,timestamp,run_time_s", file=latency_file) - pause = False - while True: - should_exit = control_semaphore.acquire(False) # type: ignore - if should_exit: - break - should_pause = pause_semaphore.acquire(False) # type: ignore - if should_pause: - pause = True - if pause: - should_resume = resume_semaphore.acquire(False) # type: ignore - if should_resume: - pause = False - else: - time.sleep(1) - continue - if execution_gap_dist is not None: - now = datetime.now().astimezone(pytz.utc) - time_unsimulated = get_time_of_the_day_unsimulated( - now, args.time_scale_factor - ) - wait_for_s = execution_gap_dist[ - int(time_unsimulated / (60 * 24) * len(execution_gap_dist)) - ] - time.sleep(wait_for_s) - elif args.avg_gap_s is not None: - # Wait times are normally distributed if execution_gap_dist is not provided. - wait_for_s = prng.gauss(args.avg_gap_s, args.avg_gap_std_s) - if wait_for_s < 0.0: - wait_for_s = 0.0 - time.sleep(wait_for_s) - - if query_frequency is not None: - qidx = prng.choices(queries, list(query_frequency))[0] + pause = False + while True: + # Note that `False` means to not block. + should_exit = control_semaphore.acquire(False) # type: ignore + if should_exit: + logger.info("T Runner %d is exiting.", worker_idx) + break + should_pause = pause_semaphore.acquire(False) # type: ignore + if should_pause: + pause = True + if pause: + should_resume = resume_semaphore.acquire(False) # type: ignore + if should_resume: + pause = False else: - if len(query_order) == 0: - query_order = queries.copy() - prng.shuffle(query_order) - - qidx = query_order.pop() - logger.debug("Executing qidx: %d", qidx) - # using the average of the best two engines as approximation of brad runtime - runtime = ( - np.sum(all_query_runtime[qidx]) - np.min(all_query_runtime[qidx]) - ) / 2 - if wait_for_execute: - time.sleep(runtime) - engine = np.argmin(all_query_runtime[qidx]) + time.sleep(1) + continue - now = datetime.now().astimezone(pytz.utc) - if args.time_scale_factor is not None: - time_unsimulated = get_time_of_the_day_unsimulated( - now, args.time_scale_factor - ) - time_unsimulated_str = time_in_minute_to_datetime_str(time_unsimulated) - else: - time_unsimulated_str = "xxx" + txn_exec_count += 1 + txn_idx = txn_prng.choices(txn_indexes, weights=transaction_weights, k=1)[0] + + now = datetime.now().astimezone(pytz.utc) + + succeeded = np.random.choice([True, False], p=[0.95, 0.05]) + + if rand_backoff is not None: + logger.info("[T %d] Continued after transient errors.", worker_idx) + rand_backoff = None + + time.sleep(0.01) + + # Record metrics. + if succeeded: + commits[txn_idx] += 1 + else: + aborts[txn_idx] += 1 + + if txn_prng.random() < args.latency_sample_prob: print( - "{},{},{},{},{},{}".format( - now, - (now - EXECUTE_START_TIME).total_seconds(), - time_unsimulated_str, - qidx, - runtime, - ENGINE_NAMES[engine], - ), - file=file, - flush=True, + "{},{},{}".format(txn_idx, now, 0.01), + file=latency_file, ) + if txn_exec_count > 20_000: + latency_file.flush() + txn_exec_count = 0 + + # Warn if the abort rate is high. + total_aborts = sum(aborts) + total_commits = sum(commits) + abort_rate = total_aborts / (total_aborts + total_commits) + if abort_rate > 0.15: + logger.info( + "[T %d] Abort rate is higher than expected ({%4f}).", + worker_idx, + abort_rate, + ) + + logger.info("[T %d] Done running transactions.", worker_idx) + latency_file.close() class PauseController: @@ -470,86 +438,6 @@ def adjust_num_running_clients( self.num_running_clients += 1 -def run_warmup(args, query_bank: List[str], queries: List[int]): - if args.engine is not None: - engine = Engine.from_str(args.engine) - else: - engine = None - - database = connect_to_db( - args, - worker_index=0, - direct_engine=engine, - # Ensure we disable the result cache if we are running directly on - # Redshift. - disable_direct_redshift_result_cache=True, - ) - - # For printing out results. - if "COND_OUT" in os.environ: - # pylint: disable-next=import-error - import conductor.lib as cond - - out_dir = cond.get_output_path() - else: - out_dir = pathlib.Path(".") - - try: - print( - f"Starting warmup pass (will run {args.run_warmup_times} times)...", - file=sys.stderr, - flush=True, - ) - with open( - out_dir / "repeating_olap_batch_warmup.csv", "w", encoding="UTF-8" - ) as file: - print("timestamp,query_idx,run_time_s,engine", file=file) - for _ in range(args.run_warmup_times): - for idx, qidx in enumerate(queries): - try: - engine = None - query = query_bank[qidx] - now = datetime.now().astimezone(pytz.utc) - start = time.time() - _, engine = database.execute_sync_with_engine(query) - end = time.time() - run_time_s = end - start - print( - "Warmed up {} of {}. Run time (s): {}".format( - idx + 1, len(queries), run_time_s - ), - file=sys.stderr, - flush=True, - ) - print( - "{},{},{},{}".format( - now, - qidx, - run_time_s, - engine.value if engine is not None else "unknown", - ), - file=file, - flush=True, - ) - except BradClientError as ex: - if ex.is_transient(): - print( - "Transient query error:", - ex.message(), - flush=True, - file=sys.stderr, - ) - else: - print( - "Unexpected query error:", - ex.message(), - flush=True, - file=sys.stderr, - ) - finally: - database.close_sync() - - async def get_command_line_input(pause_controller: PauseController) -> None: while True: try: @@ -564,114 +452,113 @@ async def get_command_line_input(pause_controller: PauseController) -> None: def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--brad-host", type=str, default="localhost") - parser.add_argument("--brad-port", type=int, default=6583) - parser.add_argument("--seed", type=int, default=42) - parser.add_argument("--num-front-ends", type=int, default=1) - parser.add_argument("--run-warmup", action="store_true") + parser = argparse.ArgumentParser( + "Tool used to run IMDB-extended transactions against BRAD or an ODBC database." + ) + parser.add_argument( + "--run-for-s", + type=int, + help="How long to run the workload for. If unset, the experiment will run until Ctrl-C.", + ) parser.add_argument( "--run-simulation", action="store_true", help="Run the simulation instead of actual execution.", ) parser.add_argument( - "--wait-for-execute-sim", - action="store_true", - help="Waiting for execution in simulation?", + "--num-clients", + type=int, + default=1, + help="The number of transactional clients.", + ) + parser.add_argument("--client-offset", type=int, default=0) + parser.add_argument( + "--seed", type=int, default=42, help="Random seed for reproducibility." ) parser.add_argument( - "--query-runtime-path", + "--cstr-var", type=str, - default=None, - help="path to the query runtime numpy file", + help="Environment variable that holds a ODBC connection string. Set to connect directly (i.e., not through BRAD)", ) parser.add_argument( - "--run-warmup-times", + "--scale-factor", type=int, default=1, - help="Run the warmup query list this many times.", + help="The scale factor used to generate the dataset.", ) parser.add_argument( - "--cstr-var", + "--isolation-level", type=str, - help="Set to connect via ODBC instead of the BRAD client (for use with other baselines).", + default="REPEATABLE READ", + help="The isolation level to use when running the transactions.", ) parser.add_argument( - "--query-bank-file", type=str, required=True, help="Path to a query bank." + "--brad-direct", + action="store_true", + help="Set to connect directly to Aurora via BRAD's config.", ) parser.add_argument( - "--time-scale-factor", + "--config-file", type=str, - required=False, - help="scale the machine time to time of the day", + help="The BRAD config file (if --brad-direct is used).", ) parser.add_argument( - "--query-frequency-path", + "--schema-name", type=str, - default=None, - help="path to the frequency to draw each query in query bank", + help="The schema name to use, if connecting directly.", ) parser.add_argument( - "--num-client-path", - type=str, - default=None, - help="Path to the distribution of number of clients for each period of a day", + "--latency-sample-prob", + type=float, + default=0.01, + help="The probability that a transaction's latency will be recorded.", ) - parser.add_argument("--num-clients", type=int, default=1) - parser.add_argument("--client-offset", type=int, default=0) - parser.add_argument("--avg-gap-s", type=float) - parser.add_argument("--avg-gap-std-s", type=float, default=0.5) - parser.add_argument("--query-indexes", type=str) parser.add_argument( - "--brad-direct", + "--dataset-type", + choices=["original", "20gb", "100gb"], + default="original", + help="This controls the range of reads the transaction worker performs, " + "depending on the dataset size.", + ) + parser.add_argument( + "--use-zipfian-ids", action="store_true", - help="Set to connect directly to Aurora via BRAD's config.", + help="Whether the transaction worker should draw movie and theatre IDs " + "from a Zipfian distribution.", ) parser.add_argument( - "--config-file", - type=str, - help="The BRAD config file (if --brad-direct is used).", + "--zipfian-alpha", + type=float, + default=1.1, + help="The alpha parameter for the Zipfian distribution. Only used if " + "--use-zipfian-ids is `True`. Must be strictly greater than 1. ", ) + # These three arguments are used for the day long experiment. parser.add_argument( - "--schema-name", + "--num-client-path", type=str, - help="The schema name to use, if connecting directly.", + default=None, + help="Path to the distribution of number of clients for each period of a day", ) parser.add_argument( - "--engine", type=str, help="The engine to use, if connecting directly." + "--num-client-multiplier", + type=int, + default=1, + help="The multiplier to the number of clients for each period of a day", ) - parser.add_argument("--run-for-s", type=int, help="If set, run for this long.") parser.add_argument( - "--ff-trace-clients", + "--time-scale-factor", type=int, - help="Start the client trace at the given number of clients. Used for debugging only.", + default=100, + help="trace 1s of simulation as X seconds in real-time to match the num-concurrent-query", ) + parser.add_argument("--brad-host", type=str, default="localhost") + parser.add_argument("--brad-port", type=int, default=6583) + parser.add_argument("--num-front-ends", type=int, default=1) args = parser.parse_args() set_up_logging() - logger.info( - "[Serial RA] Using query bank %s. Query indices: %s", - args.query_bank_file, - args.query_indexes, - ) - - with open(args.query_bank_file, "r", encoding="UTF-8") as file: - query_bank = [line.strip() for line in file] - - if args.query_frequency_path is not None and os.path.exists( - args.query_frequency_path - ): - query_frequency = np.load(args.query_frequency_path) - assert len(query_frequency) == len( - query_bank - ), "query_frequency size does not match total number of queries" - else: - query_frequency = None - - execution_gap_dist = None - if ( args.num_client_path is not None and os.path.exists(args.num_client_path) @@ -680,35 +567,13 @@ def main(): # we can only set the num_concurrent_query trace in presence of time_scale_factor with open(args.num_client_path, "rb") as f: num_client_trace = pickle.load(f) + logger.info("[T] Preparing to run a time varying workload") else: num_client_trace = None + logger.info("[T] Preparing to run a steady workload") - if args.query_indexes is None: - queries = list(range(len(query_bank))) - else: - queries = list(map(int, args.query_indexes.split(","))) - - for qidx in queries: - assert qidx < len(query_bank) - assert qidx >= 0 - - if args.run_warmup: - run_warmup(args, query_bank, queries) - return - - # Our control protocol is as follows. - # - Runner processes write to their `start_queue` when they have finished - # setting up and are ready to start running. They then wait on the control - # semaphore. - # - The control process blocks and waits on each `start_queue` to ensure - # runners can start together (if needed). - # - The control process signals the control semaphore twice. Once to tell a - # runner to start, once to tell it to stop. - # - If there is an error, a runner is free to exit as long as they have - # written to `start_queue`. mgr = mp.Manager() start_queue = [mgr.Queue() for _ in range(args.num_clients)] - # N.B. `value = 0` since we use this for synchronization, not mutual exclusion. # pylint: disable-next=no-member control_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] # pylint: disable-next=no-member @@ -716,57 +581,49 @@ def main(): # pylint: disable-next=no-member resume_semaphore = [mgr.Semaphore(value=0) for _ in range(args.num_clients)] + if args.brad_direct: + assert args.config_file is not None + assert args.schema_name is not None + config = ConfigFile.load(args.config_file) + directory = Directory(config) + asyncio.run(directory.refresh()) + else: + directory = None + + clients = [] if args.run_simulation: - assert ( - args.query_runtime_path is not None - ), "must provide query runtime to run simulation" - all_query_runtime = np.load(args.query_runtime_path) - assert all_query_runtime.shape == ( - len(query_bank), - 3, - ), "incorrect query runtime file format" - processes = [] for idx in range(args.num_clients): p = mp.Process( target=simulation_runner, args=( - all_query_runtime, + args, idx, start_queue[idx], control_semaphore[idx], pause_semaphore[idx], resume_semaphore[idx], - args, - queries, - query_frequency, - execution_gap_dist, - args.wait_for_execute_sim, ), ) p.start() - processes.append(p) + clients.append(p) else: - processes = [] for idx in range(args.num_clients): p = mp.Process( target=runner, args=( + args, idx, + directory, start_queue[idx], control_semaphore[idx], pause_semaphore[idx], resume_semaphore[idx], - args, - query_bank, - queries, - query_frequency, - execution_gap_dist, ), ) p.start() - processes.append(p) + clients.append(p) - logger.info("[RA] Waiting for startup...") + logger.info("[T] Waiting for startup...") one_startup_failed = False for i in range(args.num_clients): msg = start_queue[i].get() @@ -775,39 +632,97 @@ def main(): if one_startup_failed: logger.error( - "[RA] At least one runner failed to start up. Aborting the experiment." + "At least one transactional runner failed to start up. Aborting the experiment.", ) for i in range(args.num_clients): - # Ideally we should be able to release twice atomically. control_semaphore[i].release() control_semaphore[i].release() - for p in processes: + for p in clients: p.join() - logger.info("[RA] Overall abort complete.") + logger.info("Transactional client abort complete.") return - global EXECUTE_START_TIME # pylint: disable=global-statement - EXECUTE_START_TIME = datetime.now().astimezone( - pytz.utc - ) # pylint: disable=global-statement + if num_client_trace is not None: + logger.info("[T] Scaling number of clients by %d", args.num_client_multiplier) + for k in num_client_trace.keys(): + num_client_trace[k] *= args.num_client_multiplier + + assert args.time_scale_factor is not None, "Need to set --time-scale-factor" + assert args.run_for_s is not None, "Need to set --run-for-s" + + execute_start_time = universal_now() + num_running_client = 0 + num_client_required = min(num_client_trace[0], args.num_clients) + for add_client in range(num_running_client, num_client_required): + logger.info("[T] Telling client no. %d to start.", add_client) + control_semaphore[add_client].release() + num_running_client += 1 + + finished_one_day = True + curr_day_start_time = datetime.now().astimezone(pytz.utc) + for time_of_day, num_expected_clients in num_client_trace.items(): + if time_of_day == 0: + continue + # at this time_of_day start/shut-down more clients + time_in_s = time_of_day / args.time_scale_factor + now = datetime.now().astimezone(pytz.utc) + curr_time_in_s = (now - curr_day_start_time).total_seconds() + total_exec_time_in_s = (now - execute_start_time).total_seconds() + if args.run_for_s <= total_exec_time_in_s: + finished_one_day = False + break + if args.run_for_s - total_exec_time_in_s <= (time_in_s - curr_time_in_s): + wait_time = args.run_for_s - total_exec_time_in_s + if wait_time > 0: + time.sleep(wait_time) + finished_one_day = False + break + time.sleep(time_in_s - curr_time_in_s) + num_client_required = min(num_expected_clients, args.num_clients) + if num_client_required > num_running_client: + # starting additional clients + for add_client in range(num_running_client, num_client_required): + logger.info("[T] Telling client no. %d to start.", add_client) + control_semaphore[add_client].release() + num_running_client += 1 + elif num_running_client > num_client_required: + # shutting down clients + for delete_client in range(num_running_client, num_client_required, -1): + logger.info( + "[T] Telling client no. %d to stop.", (delete_client - 1) + ) + control_semaphore[delete_client - 1].release() + num_running_client -= 1 + now = datetime.now().astimezone(pytz.utc) + total_exec_time_in_s = (now - execute_start_time).total_seconds() + if finished_one_day: + logger.info( + "[T] Finished executing one day of workload in %d s, will ignore the rest of " + "pre-set execution time %d s", + total_exec_time_in_s, + args.run_for_s, + ) + else: + logger.info( + "[T] Executed ended but unable to finish executing the trace of a full day within %d s", + args.run_for_s, + ) - logger.info("[RA] Telling all %d clients to start.", args.num_clients) - for i in range(args.num_clients): - control_semaphore[i].release() + else: + logger.info("[T] Telling all %d clients to start.", args.num_clients) + for idx in range(args.num_clients): + control_semaphore[idx].release() pause_controller = PauseController( args.num_clients, pause_semaphore, resume_semaphore ) if args.run_for_s is not None and num_client_trace is None: - logger.info("[RA] Waiting for %d seconds...", args.run_for_s) - asyncio.run(get_command_line_input(pause_controller)) + logger.info("[T] Letting the experiment run for %d seconds...", args.run_for_s) time.sleep(args.run_for_s) + elif num_client_trace is None: - # Wait until requested to stop. - logger.info( - "Repeating analytics waiting until requested to stop... (hit Ctrl-C)", - ) + logger.info("[T] Waiting until requested to stop... (hit Ctrl-C)") logger.info( "type in an integer smaller than total number of clients and press enter to change number of running client, type in exit to stop dynamically adjusting number of clients...", ) @@ -822,22 +737,18 @@ def signal_handler(_signal, _frame): should_shutdown.wait() - logger.info("[RA] Stopping all clients...") - for i in range(args.num_clients): + logger.info("[T] Stopping clients...") + for idx in range(args.num_clients): # Note that in most cases, one release will have already run. This is OK # because downstream runners will not hang if there is a unconsumed # semaphore value. - control_semaphore[i].release() - control_semaphore[i].release() - - logger.info("[RA] Waiting for the clients to complete...") - for p in processes: - p.join() - - for idx, p in enumerate(processes): - logger.info("Runner %d exit code: %d", idx, p.exitcode) + control_semaphore[idx].release() + control_semaphore[idx].release() - logger.info("Done repeating analytics!") + logger.info("[T] Waiting for clients to terminate...") + for c in clients: + c.join() + logger.info("[T] Done transactions!") if __name__ == "__main__":