Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: use ZSTD compressed parquet by default #8158

Closed
wants to merge 9 commits into from
Closed

Conversation

dbtsai
Copy link
Member

@dbtsai dbtsai commented Jul 26, 2023

In memory of @kbendick as he was making zstd parquet as default at Apple. He conducted the extensive benchmarking and internal testing; his valuable findings recommending to adopt zstd parquet as default.

This PR modifies the default Iceberg parquet compression codec from gzip to zstd.

Currently, Iceberg employs gzip compression as the default option. However, based on our benchmark results, we have found that zstd-compressed parquet files consistently exhibit faster compression and decompression speeds compared to gzip parquet files. Additionally, zstd parquet files are generally slightly smaller in size than their gzip counterparts. As a result of these findings, Trino has already made the switch from gzip to zstd as its Iceberg parquet codec in trinodb/trino#10045

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea, we do recommend this internally.

@dbtsai
Copy link
Member Author

dbtsai commented Jul 27, 2023

Look like there are issues in using Zstd with nested data in both Flink and Spark. This can be reproduced by

./gradlew -DsparkVersions=3.4 -DscalaVersion=2.13 -DhiveVersions= -DflinkVersions= \
    :iceberg-spark:iceberg-spark-3.4_2.13:test \
    --tests "org.apache.iceberg.spark.source.TestMetadataTableReadableMetrics" \
    -Pquick=true -x javadoc

for Spark, and

./gradlew -DsparkVersions= -DhiveVersions= -DflinkVersions=1.15 \
    :iceberg-flink:iceberg-flink-1.15:test \
    --tests "org.apache.iceberg.flink.source.TestMetadataTableReadableMetrics" \
    -Pquick=true -x javadoc

for Flink.

Will look into the issue soon.

@@ -291,7 +291,7 @@ public void testSelectNestedValues() throws Exception {
public void testNestedValues() throws Exception {
createNestedTable();

Row leafDoubleCol = Row.of(53L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho @RussellSpitzer do you know what leafDoubleCol is? Is it some kind of metrics that could be changed if the compression codec is changed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think it include the size, sorry for lack of comment, lets disable zstd in :

    Table table =
        catalog.createTable(
            TableIdentifier.of(Namespace.of(database()), tableName()),
            PRIMITIVE_SCHEMA,
            PartitionSpec.unpartitioned(),
            ImmutableMap.of());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can confirm it's the metric of size. Because of changing to zstd, the sizes are reduced overall. I updated it to the new metrics. We can have a followup PR to change those metric related test with uncompressed parquet.

Copy link
Collaborator

@szehon-ho szehon-ho Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbtsai , i looked into it. I think we have to put it where we write data files, but its a bigger change. There's a test helper we use called FileHelpers.writeDataFile() and we need to change the code to take in map of properties.

Then,

  public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows, Map<String, String> properties)
      throws IOException {
    FileFormat format = defaultFormat(table.properties());
    GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());
    properties.forEach(factory::set);

but its ok to do in separate pr if you want.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s do it in a followup PR to reduce the changes

Copy link
Collaborator

@szehon-ho szehon-ho Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chat offline with @dbtsai , will make follow up to make expected metrics take from DataFile.fileSizeInBytes()

@RussellSpitzer
Copy link
Member

Since this is a change to defaults I do want to give other folks a chance to chime in, but I think this is a very safe change for new tables and won't effect older ones so I'm inclined to get this in for the next minor Iceberg release.

@szehon-ho
Copy link
Collaborator

I also mentioned same thing, to wait a day or two for any other comment.

this is a very safe change for new tables and won't effect older ones

Wouldn't this affect all existing tables where this property is not explicitly set to GZIP/UNCOMPRESSED?

@RussellSpitzer
Copy link
Member

I also mentioned same thing, to wait a day or two for any other comment.

this is a very safe change for new tables and won't effect older ones

Wouldn't this affect all existing tables where this property is not explicitly set to GZIP/UNCOMPRESSED?

There was a bug where default values were not getting persisted, but they are supposed to


public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level";
public static final String DELETE_PARQUET_COMPRESSION_LEVEL =
"write.delete.parquet.compression-level";
public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT = null;
public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT =
null; // For zstd, it is default to "3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How valuable is this comment given than it is specific to the underlying codec and may change in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, like we don't mention gzip is default to "6".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@aokolnychyi
Copy link
Contributor

I generally support switching to zstd by default but I think @szehon-ho is right that it will effect not only new tables but also existing tables where the codec value was not set explicitly. Is that a problem? Maybe? We can consider doing that only for new tables but I am not sure it is worth the extra complexity. If someone did not configure the codec, most likely it does not matter for them so they won't even notice this change?

Thoughts, @rdblue @jackye1995 @stevenzwu @nastra @danielcweeks?

@stevenzwu
Copy link
Contributor

it will effect not only new tables but also existing tables where the codec value was not set explicitly. Is that a problem?

My main question is on the dep. does all runtime env has zstd ready? we know gzip is.

@manuzhang
Copy link
Contributor

manuzhang commented Aug 1, 2023

If someone did not configure the codec, most likely it does not matter for them so they won't even notice this change?

End users won't notice, even if we only switch the default for new tables. Platform admins need to be aware of the change and handle any dependency issues or side effects for end users. IMO, it's more about how we communicate it to users.

@dbtsai
Copy link
Member Author

dbtsai commented Aug 1, 2023

To avoid surprise to our users, I agree with the above comments. We should address the following two items as followup PRs.

  1. Persistent the default value of compression codec in write.parquet.compression-codec for new table.
  2. For existing table that doesn't set write.parquet.compression-codec, we should default to gzip, and set it.

@stevenzwu I believe most of the modern runtime supports zstd parquet. Trino switched to use zstd parquet as default for Iceberg table for almost 2 years, and we never hear any compatibility issue.

@rustyconover
Copy link
Contributor

I'd suggest that the compression level of ZSTD is important to consider.

ZSTD has a wide range of compression levels and the choice of the default level is important for write speed (and file size). Decompression speed isn't as sensitive to change based on the compression level.

@pan3793
Copy link
Member

pan3793 commented Aug 2, 2023

I'd suggest that the compression level of ZSTD is important to consider.

ZSTD has a wide range of compression levels and the choice of the default level is important for write speed (and file size). Decompression speed isn't as sensitive to change based on the compression level.

+1, I think using 3 as the default compression level is not a good idea.

Based on my experience, 0~3 is suitable for transient data, e.g. Spark shuffle data. Typically for persisting data, e.g. warehouse, ~9 is recommended.

@dbtsai
Copy link
Member Author

dbtsai commented Aug 2, 2023

It's a very nice property that the decompression time is constant to different compression level with zstd, and users can pay one time cost for archive data. The current default value 3 is already smaller than gzip with significant speedup. We can evaluate if we want to change the default value to higher compression level in a followup PR with more experiments.

- code: "java.field.constantValueChanged"
old: "field org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT"
new: "field org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT"
justification: "{Changing the default compression codec from gzip to zstd}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
justification: "{Changing the default compression codec from gzip to zstd}"
justification: "Changing the default compression codec from gzip to zstd"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. Thanks.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @dbtsai.

I did some test in the past, size reduction and write speed up is obvious win but read might need a bit tunning on compression level. I am wondering if there's any quantitative metrics/benchmark you can share on how much was the improvement?

@dbtsai
Copy link
Member Author

dbtsai commented Aug 6, 2023

With some of our dataset, we saw that ZSTD is around 5% smaller than GZIP parquet but 1.5x faster than GZIP Parquet for write, and 1.13x faster for read. This was done a while ago using Spark Parquet reader/writer instead of Iceberg Parquet reader/writer, and I believe we will see similar gain with Iceberg as well.

@manuzhang
Copy link
Contributor

@dbtsai both use default compression levels? Uber also did some benchmark on their data but it's two years ago

@dbtsai
Copy link
Member Author

dbtsai commented Aug 7, 2023

@manuzhang we use the default zstd compression level, 3.

@aokolnychyi
Copy link
Contributor

Let's discuss this change during the community sync tomorrow.

@Fokko
Copy link
Contributor

Fokko commented Sep 28, 2023

This has been added by @aokolnychyi in #8593 🥳

@Fokko Fokko closed this Sep 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.