diff --git a/examples/run_benchmarks.sh b/examples/run_benchmarks.sh index 1724d62..dc0d909 100755 --- a/examples/run_benchmarks.sh +++ b/examples/run_benchmarks.sh @@ -9,57 +9,19 @@ set -euo pipefail SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" cd "${SCRIPT_DIR}/" -./terasort/build.sh ./sql/build.sh +./terasort/build.sh REPEAT=${REPEAT:-20} export CHECKSUM_ENABLED=${CHECKSUM_ENABLED:-false} +export USE_FALLBACK_FETCH=false export USE_S3_SHUFFLE=0 export USE_NFS_SHUFFLE=0 -# TeraSort experiments -export INSTANCES=4 -TERASORT_SIZES=( - 1g - 10g - 100g +BLOCK_SIZES=( + 32 + 128 ) -for size in "${TERASORT_SIZES[@]}"; -do - for ((i = 0 ; i < ${REPEAT} ; i++)); - do - export SIZE=$size - export USE_FALLBACK_FETCH=false - - export USE_S3_SHUFFLE=0 - export USE_NFS_SHUFFLE=0 - ./terasort/run.sh || true - mc rm -r --force zac/zrlio-tmp - - export USE_S3_SHUFFLE=0 - export USE_NFS_SHUFFLE=1 - ./terasort/run.sh || true - mc rm -r --force zac/zrlio-tmp - - export USE_S3_SHUFFLE=1 - export USE_NFS_SHUFFLE=0 - ./terasort/run.sh || true - mc rm -r --force zac/zrlio-tmp - - # # Enable fallback fetch - # export USE_FALLBACK_FETCH=true - - # export USE_S3_SHUFFLE=0 - # export USE_NFS_SHUFFLE=1 - # ./terasort/run.sh || true - # mc rm -r --force zac/zrlio-tmp - - # export USE_S3_SHUFFLE=1 - # export USE_NFS_SHUFFLE=0 - # ./terasort/run.sh || true - # mc rm -r --force zac/zrlio-tmp - done -done # SQL experiments export INSTANCES=12 @@ -71,32 +33,52 @@ SQL_QUERIES=( q67 # 66 GB shuffle data ) -for ((i = 0 ; i < ${REPEAT} ; i++)); -do - for query in "${SQL_QUERIES[@]}"; do - export USE_FALLBACK_FETCH=false - - export USE_S3_SHUFFLE=0 - export USE_NFS_SHUFFLE=0 - ./sql/run_single_query.sh $query || true - - export USE_S3_SHUFFLE=0 - export USE_NFS_SHUFFLE=1 - ./sql/run_single_query.sh $query || true - - export USE_S3_SHUFFLE=1 - export USE_NFS_SHUFFLE=0 - ./sql/run_single_query.sh $query || true - - # # Enable fallback fetch. - # export USE_FALLBACK_FETCH=true +for ((i = 0 ; i < ${REPEAT} ; i++)); do + for blockSize in "${BLOCK_SIZES[@]}"; do + export BLOCK_SIZE=${blockSize} + for query in "${SQL_QUERIES[@]}"; do + export USE_S3_SHUFFLE=0 + export USE_NFS_SHUFFLE=0 + ./sql/run_single_query.sh $query || true + + export USE_S3_SHUFFLE=0 + export USE_NFS_SHUFFLE=1 + ./sql/run_single_query.sh $query || true + + export USE_S3_SHUFFLE=1 + export USE_NFS_SHUFFLE=0 + ./sql/run_single_query.sh $query || true + done + done +done - # export USE_S3_SHUFFLE=0 - # export USE_NFS_SHUFFLE=1 - # ./sql/run_single_query.sh $query || true +# TeraSort experiments +export INSTANCES=4 +TERASORT_SIZES=( + 1g + 10g + 100g +) - # export USE_S3_SHUFFLE=1 - # export USE_NFS_SHUFFLE=0 - # ./sql/run_single_query.sh $query || true +for size in "${TERASORT_SIZES[@]}"; do + for ((i = 0 ; i < ${REPEAT} ; i++)); do + for blockSize in "${BLOCK_SIZES[@]}"; do + export BLOCK_SIZE=${blockSize} + export SIZE=$size + export USE_FALLBACK_FETCH=false + export USE_TRANSFER_TO=1 + + export USE_S3_SHUFFLE=0 + export USE_NFS_SHUFFLE=0 + ./terasort/run.sh || true + + export USE_S3_SHUFFLE=0 + export USE_NFS_SHUFFLE=1 + ./terasort/run.sh || true + + export USE_S3_SHUFFLE=1 + export USE_NFS_SHUFFLE=0 + ./terasort/run.sh || true + done done -done +done \ No newline at end of file diff --git a/examples/sql/run_benchmark.sh b/examples/sql/run_benchmark.sh index b8a5b82..f70ff96 100755 --- a/examples/sql/run_benchmark.sh +++ b/examples/sql/run_benchmark.sh @@ -35,6 +35,12 @@ export SPARK_EXECUTOR_CORES=$EXECUTOR_CPU export SPARK_DRIVER_MEMORY=$DRIVER_MEM export SPARK_EXECUTOR_MEMORY=$EXECUTOR_MEM +BLOCK_SIZE=${BLOCK_SIZE:-128} +LOGGING=( + --conf spark.eventLog.enabled=true + --conf spark.eventLog.dir=file:///spark-logs +) + SPARK_HADOOP_S3A_CONFIG=( # Required --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem @@ -44,19 +50,17 @@ SPARK_HADOOP_S3A_CONFIG=( --conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT} --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.hadoop.fs.s3a.fast.upload=true - --conf spark.hadoop.fs.s3a.block.size=$((128*1024*1024)) + --conf spark.hadoop.fs.s3a.block.size=$(($BLOCK_SIZE * 1024 * 1024)) + --conf spark.hadoop.fs.s3a.fast.upload.buffer=array ) SPARK_S3_SHUFFLE_CONFIG=( - --conf spark.hadoop.fs.s3a.access.key=${S3A_ACCESS_KEY} - --conf spark.hadoop.fs.s3a.secret.key=${S3A_SECRET_KEY} - --conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT} --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager" --conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO --conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED} --conf spark.shuffle.s3.rootDir=${SHUFFLE_DESTINATION} - --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} - --conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION} + # --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} + # --conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION} ) if (( "$USE_S3_SHUFFLE" == 0 )); then @@ -83,8 +87,8 @@ if (( "$USE_NFS_SHUFFLE" == 1 )); then --conf spark.kubernetes.executor.podTemplateFile=${SCRIPT_DIR}/../templates/executor_nfs.yml --conf spark.kubernetes.driver.podTemplateFile=${SCRIPT_DIR}/../templates/driver_nfs.yml --conf spark.hadoop.fs.file.block.size=$((128*1024*1024)) - --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} - --conf spark.storage.decommission.fallbackStorage.path=file:///nfs/ + # --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} + # --conf spark.storage.decommission.fallbackStorage.path=file:///nfs/ ) SPARK_KUBERNETES_TEMPLATES=( @@ -97,6 +101,15 @@ if [[ "${USE_FALLBACK_FETCH}" == "true" ]]; then PROCESS_TAG="${PROCESS_TAG}-fallback" fi +EXTRA_OPTIONS=( +) +if (( "${USE_TRANSFER_TO:-1}" == 0 )); then + PROCESS_TAG="${PROCESS_TAG}-transferTo_false" + EXTRA_OPTIONS=( + --conf spark.file.transferTo=false \ + ) +fi + USE_PROFILER=${USE_PROFILER:-0} if (( "${USE_PROFILER}" == 1 )); then PROFILER_CONFIG="reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/profiler_config.yml,metricInterval=5000,sampleInterval=5000,ioProfiling=true" @@ -117,10 +130,12 @@ ${SPARK_HOME}/bin/spark-submit \ --conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_OPTIONS}" \ --conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_OPTIONS}" \ \ - --name ce-${PROCESS_TAG}-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \ + --name ce-${PROCESS_TAG}-bs${BLOCK_SIZE}MiB-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \ --conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \ --conf spark.kryoserializer.buffer=128mb \ --conf spark.executor.instances=$INSTANCES \ + "${EXTRA_OPTIONS[@]}" \ + "${LOGGING[@]}" \ "${SPARK_HADOOP_S3A_CONFIG[@]}" \ "${SPARK_S3_SHUFFLE_CONFIG[@]}" \ "${SPARK_KUBERNETES_TEMPLATES[@]}" \ diff --git a/examples/templates/driver.yml b/examples/templates/driver.yml index 218e4fe..67ebec4 100644 --- a/examples/templates/driver.yml +++ b/examples/templates/driver.yml @@ -14,6 +14,14 @@ spec: image: will-be-overwritten resources: limits: - ephemeral-storage: 16G + ephemeral-storage: 32G requests: - ephemeral-storage: 16G + ephemeral-storage: 32G + volumeMounts: + - name: spark-logs + mountPath: /spark-logs + volumes: + - name: spark-logs + nfs: + server: 10.40.1.32 + path: /spark-logs \ No newline at end of file diff --git a/examples/templates/driver_nfs.yml b/examples/templates/driver_nfs.yml index 1a42077..6ae2703 100644 --- a/examples/templates/driver_nfs.yml +++ b/examples/templates/driver_nfs.yml @@ -20,8 +20,14 @@ spec: volumeMounts: - name: nfs mountPath: /nfs + - name: spark-logs + mountPath: /spark-logs volumes: - name: nfs nfs: server: 10.40.1.32 path: /shuffle + - name: spark-logs + nfs: + server: 10.40.1.32 + path: /spark-logs diff --git a/examples/templates/executor.yml b/examples/templates/executor.yml index 218e4fe..7194de0 100644 --- a/examples/templates/executor.yml +++ b/examples/templates/executor.yml @@ -14,6 +14,14 @@ spec: image: will-be-overwritten resources: limits: - ephemeral-storage: 16G + ephemeral-storage: 32G requests: - ephemeral-storage: 16G + ephemeral-storage: 32G + volumeMounts: + - name: spark-logs + mountPath: /spark-logs + volumes: + - name: spark-logs + nfs: + server: 10.40.1.32 + path: /spark-logs diff --git a/examples/templates/executor_nfs.yml b/examples/templates/executor_nfs.yml index 1a42077..6ae2703 100644 --- a/examples/templates/executor_nfs.yml +++ b/examples/templates/executor_nfs.yml @@ -20,8 +20,14 @@ spec: volumeMounts: - name: nfs mountPath: /nfs + - name: spark-logs + mountPath: /spark-logs volumes: - name: nfs nfs: server: 10.40.1.32 path: /shuffle + - name: spark-logs + nfs: + server: 10.40.1.32 + path: /spark-logs diff --git a/examples/terasort/run.sh b/examples/terasort/run.sh index 194e94a..f91454b 100755 --- a/examples/terasort/run.sh +++ b/examples/terasort/run.sh @@ -35,6 +35,12 @@ export SPARK_EXECUTOR_CORES=$EXECUTOR_CPU export SPARK_DRIVER_MEMORY=$DRIVER_MEM export SPARK_EXECUTOR_MEMORY=$EXECUTOR_MEM +LOGGING=( + --conf spark.eventLog.enabled=true + --conf spark.eventLog.dir=file:///spark-logs +) + +BLOCK_SIZE=${BLOCK_SIZE:-128} SPARK_HADOOP_S3A_CONFIG=( # Required @@ -45,20 +51,18 @@ SPARK_HADOOP_S3A_CONFIG=( --conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT} --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.hadoop.fs.s3a.fast.upload=true - --conf spark.hadoop.fs.s3a.block.size=$((128*1024*1024)) + --conf spark.hadoop.fs.s3a.block.size=$(($BLOCK_SIZE * 1024 * 1024)) + --conf spark.hadoop.fs.s3a.fast.upload.buffer=array ) SPARK_S3_SHUFFLE_CONFIG=( - --conf spark.hadoop.fs.s3a.access.key=${S3A_ACCESS_KEY} - --conf spark.hadoop.fs.s3a.secret.key=${S3A_SECRET_KEY} - --conf spark.hadoop.fs.s3a.endpoint=${S3A_ENDPOINT} --conf spark.shuffle.manager="org.apache.spark.shuffle.sort.S3ShuffleManager" --conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO --conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED} --conf spark.shuffle.s3.rootDir=${SHUFFLE_DESTINATION} - --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} - --conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION} + # --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} + # --conf spark.storage.decommission.fallbackStorage.path=${SHUFFLE_DESTINATION} ) if (( "$USE_S3_SHUFFLE" == 0 )); then @@ -82,9 +86,10 @@ if (( "$USE_NFS_SHUFFLE" == 1 )); then --conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.S3ShuffleDataIO --conf spark.shuffle.checksum.enabled=${CHECKSUM_ENABLED} --conf spark.shuffle.s3.rootDir=file:///nfs/ - --conf spark.hadoop.fs.file.block.size=$((128*1024*1024)) - --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} - --conf spark.storage.decommission.fallbackStorage.path=file:///nfs/ + --conf spark.hadoop.fs.file.block.size=$(($BLOCK_SIZE*1024*1024)) + # --conf spark.shuffle.s3.useSparkShuffleFetch=${USE_FALLBACK_FETCH} + # --conf spark.storage.decommission.fallbackStorage.path=file:///nfs/ + # --conf spark.shuffle.s3.maxConcurrencyTask=5 ) SPARK_KUBERNETES_TEMPLATES=( @@ -97,6 +102,15 @@ if [[ "${USE_FALLBACK_FETCH}" == "true" ]]; then PROCESS_TAG="${PROCESS_TAG}-fallback" fi +EXTRA_OPTIONS=( +) +if (( "${USE_TRANSFER_TO:-1}" == 0 )); then + PROCESS_TAG="${PROCESS_TAG}-transferTo_false" + EXTRA_OPTIONS=( + --conf spark.file.transferTo=false \ + ) +fi + USE_PROFILER=${USE_PROFILER:-0} if (( "${USE_PROFILER}" == 1 )); then PROFILER_CONFIG="reporter=com.uber.profiling.reporters.InfluxDBOutputReporter,configProvider=com.uber.profiling.YamlConfigProvider,configFile=/profiler_config.yml,metricInterval=5000,sampleInterval=5000,ioProfiling=true" @@ -117,10 +131,11 @@ ${SPARK_HOME}/bin/spark-submit \ --conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_OPTIONS}" \ --conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_OPTIONS}" \ \ - --name ce-terasort-${SIZE}${PROCESS_TAG}-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \ + --name ce-terasort-${SIZE}${PROCESS_TAG}-bs${BLOCK_SIZE}MiB-${INSTANCES}x${EXECUTOR_CPU}--${EXECUTOR_MEM} \ --conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \ - --conf spark.kryoserializer.buffer=128mb \ --conf spark.executor.instances=$INSTANCES \ + "${EXTRA_OPTIONS[@]}" \ + "${LOGGING[@]}" \ "${SPARK_HADOOP_S3A_CONFIG[@]}" \ "${SPARK_S3_SHUFFLE_CONFIG[@]}" \ "${SPARK_KUBERNETES_TEMPLATES[@]}" \ @@ -154,7 +169,6 @@ ${SPARK_HOME}/bin/spark-submit \ --conf spark.executor.instances=$INSTANCES \ --conf spark.jars.ivy=/tmp/.ivy \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ - --conf spark.kryoserializer.buffer=128mb \ "${SPARK_HADOOP_S3A_CONFIG[@]}" \ --conf spark.ui.prometheus.enabled=true \ --conf spark.network.timeout=10000 \