Skip to content

Commit

Permalink
benchmark: Make poller threads configurable for SQL benchmarks.
Browse files Browse the repository at this point in the history
Also, set the default to 10 for the benchmark runners, because
that produces the best results.

Signed-off-by: Ben Pfaff <[email protected]>
  • Loading branch information
blp committed Jul 7, 2024
1 parent dcb83db commit f30129c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
4 changes: 4 additions & 0 deletions benchmark/feldera-sql/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def main():
parser.add_argument('--output', action=argparse.BooleanOptionalAction, help='whether to write query output back to Kafka (default: --no-output)')
parser.add_argument('--merge', action=argparse.BooleanOptionalAction, help='whether to merge all the queries into one program (default: --no-merge)')
parser.add_argument('--storage', action=argparse.BooleanOptionalAction, help='whether to enable storage (default: --no-storage)')
parser.add_argument("--poller-threads", required=False, type=int, help="Override number of poller threads to use")
parser.add_argument('--min-storage-bytes', type=int, help='If storage is enabled, the minimum number of bytes to write a batch to storage.')
parser.add_argument('--query', action='append', help='queries to run (by default, all queries), specify one or more of: ' + ','.join(sort_queries(QUERY_SQL.keys())))
parser.add_argument('--input-topic-suffix', help='suffix to apply to input topic names (by default, "")')
Expand All @@ -313,6 +314,9 @@ def main():
queries = sort_queries(parse_queries(parser.parse_args().query))
cores = int(parser.parse_args().cores)
storage = parser.parse_args().storage
poller_threads = parser.parse_args().poller_threads
if poller_threads is not None:
kafka_options["poller_threads"] = poller_threads
min_storage_bytes = parser.parse_args().min_storage_bytes
if min_storage_bytes is not None:
min_storage_bytes = int(min_storage_bytes)
Expand Down
1 change: 1 addition & 0 deletions benchmark/run-nexmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ case $runner:$language in
--api-url="$api_url" \
-O bootstrap.servers="${kafka_from_feldera:-${kafka_broker}}" \
--cores $cores \
--poller-threads 10 \
--input-topic-suffix="-$partitions-$events" \
--csv results.csv \
--query $(if test $query = all; then echo all; else echo q$query; fi)
Expand Down
18 changes: 13 additions & 5 deletions scripts/bench.bash
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,21 @@ fi
KAFKA_BROKER=localhost:9092
rpk topic -X brokers=$KAFKA_BROKER delete bid auction person
cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER
FELDERA_API=http://localhost:8080
python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_METRICS_CSV_FILE} --metrics-interval 1

rpk topic -X brokers=$KAFKA_BROKER delete bid auction person
cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER
FELDERA_API=http://localhost:8080
python3 benchmark/feldera-sql/run.py --storage --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_STORAGE_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_STORAGE_METRICS_CSV_FILE} --metrics-interval 1
nexmark_sql_benchmark() {
local csv=$1 metrics=$2; shift; shift
python3 benchmark/feldera-sql/run.py \
--api-url $FELDERA_API \
-O bootstrap.servers=$KAFKA_BROKER \
--csv "crates/nexmark/$csv" \
--csv-metrics "crates/nexmark/$metrics" \
--metrics-interval 1 \
--poller-threads 10 \
"$@"
}
nexmark_sql_benchmark "${NEXMARK_SQL_CSV_FILE}" "${NEXMARK_SQL_METRICS_CSV_FILE}"
nexmark_sql_benchmark "${NEXMARK_SQL_STORAGE_CSV_FILE}" "${NEXMARK_SQL_STORAGE_METRICS_CSV_FILE}" --storage

# Run galen benchmark
cargo bench --bench galen -- --workers 10 --csv ${GALEN_CSV_FILE}
Expand Down

0 comments on commit f30129c

Please sign in to comment.