diff --git a/benchmark/feldera-sql/run.py b/benchmark/feldera-sql/run.py index cb4fe4f38..92c162683 100755 --- a/benchmark/feldera-sql/run.py +++ b/benchmark/feldera-sql/run.py @@ -291,6 +291,7 @@ def main(): parser.add_argument('--output', action=argparse.BooleanOptionalAction, help='whether to write query output back to Kafka (default: --no-output)') parser.add_argument('--merge', action=argparse.BooleanOptionalAction, help='whether to merge all the queries into one program (default: --no-merge)') parser.add_argument('--storage', action=argparse.BooleanOptionalAction, help='whether to enable storage (default: --no-storage)') + parser.add_argument("--poller-threads", required=False, type=int, help="Override number of poller threads to use") parser.add_argument('--min-storage-bytes', type=int, help='If storage is enabled, the minimum number of bytes to write a batch to storage.') parser.add_argument('--query', action='append', help='queries to run (by default, all queries), specify one or more of: ' + ','.join(sort_queries(QUERY_SQL.keys()))) parser.add_argument('--input-topic-suffix', help='suffix to apply to input topic names (by default, "")') @@ -313,6 +314,9 @@ def main(): queries = sort_queries(parse_queries(parser.parse_args().query)) cores = int(parser.parse_args().cores) storage = parser.parse_args().storage + poller_threads = parser.parse_args().poller_threads + if poller_threads is not None: + kafka_options["poller_threads"] = poller_threads min_storage_bytes = parser.parse_args().min_storage_bytes if min_storage_bytes is not None: min_storage_bytes = int(min_storage_bytes) diff --git a/benchmark/run-nexmark.sh b/benchmark/run-nexmark.sh index 536b95d52..6f6795121 100755 --- a/benchmark/run-nexmark.sh +++ b/benchmark/run-nexmark.sh @@ -505,6 +505,7 @@ case $runner:$language in --api-url="$api_url" \ -O bootstrap.servers="${kafka_from_feldera:-${kafka_broker}}" \ --cores $cores \ + --poller-threads 10 \ --input-topic-suffix="-$partitions-$events" \ --csv results.csv \ --query $(if test $query = all; then echo all; else echo q$query; fi) diff --git a/scripts/bench.bash b/scripts/bench.bash index a942e4927..ea682c5bc 100644 --- a/scripts/bench.bash +++ b/scripts/bench.bash @@ -47,13 +47,21 @@ fi KAFKA_BROKER=localhost:9092 rpk topic -X brokers=$KAFKA_BROKER delete bid auction person cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER -FELDERA_API=http://localhost:8080 -python3 benchmark/feldera-sql/run.py --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_METRICS_CSV_FILE} --metrics-interval 1 -rpk topic -X brokers=$KAFKA_BROKER delete bid auction person -cargo run -p dbsp_nexmark --example generate --features with-kafka -- --max-events ${MAX_EVENTS} -O bootstrap.servers=$KAFKA_BROKER FELDERA_API=http://localhost:8080 -python3 benchmark/feldera-sql/run.py --storage --api-url $FELDERA_API -O bootstrap.servers=$KAFKA_BROKER --csv crates/nexmark/${NEXMARK_SQL_STORAGE_CSV_FILE} --csv-metrics crates/nexmark/${NEXMARK_SQL_STORAGE_METRICS_CSV_FILE} --metrics-interval 1 +nexmark_sql_benchmark() { + local csv=$1 metrics=$2; shift; shift + python3 benchmark/feldera-sql/run.py \ + --api-url $FELDERA_API \ + -O bootstrap.servers=$KAFKA_BROKER \ + --csv "crates/nexmark/$csv" \ + --csv-metrics "crates/nexmark/$metrics" \ + --metrics-interval 1 \ + --poller-threads 10 \ + "$@" +} +nexmark_sql_benchmark "${NEXMARK_SQL_CSV_FILE}" "${NEXMARK_SQL_METRICS_CSV_FILE}" +nexmark_sql_benchmark "${NEXMARK_SQL_STORAGE_CSV_FILE}" "${NEXMARK_SQL_STORAGE_METRICS_CSV_FILE}" --storage # Run galen benchmark cargo bench --bench galen -- --workers 10 --csv ${GALEN_CSV_FILE}