-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: use ZSTD compressed parquet by default #8158
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good idea, we do recommend this internally.
Look like there are issues in using Zstd with nested data in both Flink and Spark. This can be reproduced by
for Spark, and
for Flink. Will look into the issue soon. |
@@ -291,7 +291,7 @@ public void testSelectNestedValues() throws Exception { | |||
public void testNestedValues() throws Exception { | |||
createNestedTable(); | |||
|
|||
Row leafDoubleCol = Row.of(53L, 3L, 1L, 1L, 0.0D, 0.0D); | |||
Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho @RussellSpitzer do you know what leafDoubleCol
is? Is it some kind of metrics that could be changed if the compression codec is changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think it include the size, sorry for lack of comment, lets disable zstd in :
Table table =
catalog.createTable(
TableIdentifier.of(Namespace.of(database()), tableName()),
PRIMITIVE_SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can confirm it's the metric of size. Because of changing to zstd, the sizes are reduced overall. I updated it to the new metrics. We can have a followup PR to change those metric related test with uncompressed parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dbtsai , i looked into it. I think we have to put it where we write data files, but its a bigger change. There's a test helper we use called FileHelpers.writeDataFile() and we need to change the code to take in map of properties.
Then,
public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows, Map<String, String> properties)
throws IOException {
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());
properties.forEach(factory::set);
but its ok to do in separate pr if you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let’s do it in a followup PR to reduce the changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chat offline with @dbtsai , will make follow up to make expected metrics take from DataFile.fileSizeInBytes()
Since this is a change to defaults I do want to give other folks a chance to chime in, but I think this is a very safe change for new tables and won't effect older ones so I'm inclined to get this in for the next minor Iceberg release. |
I also mentioned same thing, to wait a day or two for any other comment.
Wouldn't this affect all existing tables where this property is not explicitly set to GZIP/UNCOMPRESSED? |
There was a bug where default values were not getting persisted, but they are supposed to |
|
||
public static final String PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"; | ||
public static final String DELETE_PARQUET_COMPRESSION_LEVEL = | ||
"write.delete.parquet.compression-level"; | ||
public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT = null; | ||
public static final String PARQUET_COMPRESSION_LEVEL_DEFAULT = | ||
null; // For zstd, it is default to "3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How valuable is this comment given than it is specific to the underlying codec and may change in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, like we don't mention gzip is default to "6".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
I generally support switching to Thoughts, @rdblue @jackye1995 @stevenzwu @nastra @danielcweeks? |
My main question is on the dep. does all runtime env has |
End users won't notice, even if we only switch the default for new tables. Platform admins need to be aware of the change and handle any dependency issues or side effects for end users. IMO, it's more about how we communicate it to users. |
To avoid surprise to our users, I agree with the above comments. We should address the following two items as followup PRs.
@stevenzwu I believe most of the modern runtime supports zstd parquet. Trino switched to use zstd parquet as default for Iceberg table for almost 2 years, and we never hear any compatibility issue. |
I'd suggest that the compression level of ZSTD is important to consider. ZSTD has a wide range of compression levels and the choice of the default level is important for write speed (and file size). Decompression speed isn't as sensitive to change based on the compression level. |
+1, I think using 3 as the default compression level is not a good idea. Based on my experience, 0~3 is suitable for transient data, e.g. Spark shuffle data. Typically for persisting data, e.g. warehouse, ~9 is recommended. |
It's a very nice property that the decompression time is constant to different compression level with zstd, and users can pay one time cost for archive data. The current default value 3 is already smaller than gzip with significant speedup. We can evaluate if we want to change the default value to higher compression level in a followup PR with more experiments. |
.palantir/revapi.yml
Outdated
- code: "java.field.constantValueChanged" | ||
old: "field org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT" | ||
new: "field org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT" | ||
justification: "{Changing the default compression codec from gzip to zstd}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
justification: "{Changing the default compression codec from gzip to zstd}" | |
justification: "Changing the default compression codec from gzip to zstd" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @dbtsai.
I did some test in the past, size reduction and write speed up is obvious win but read might need a bit tunning on compression level. I am wondering if there's any quantitative metrics/benchmark you can share on how much was the improvement?
With some of our dataset, we saw that ZSTD is around 5% smaller than GZIP parquet but 1.5x faster than GZIP Parquet for write, and 1.13x faster for read. This was done a while ago using Spark Parquet reader/writer instead of Iceberg Parquet reader/writer, and I believe we will see similar gain with Iceberg as well. |
@dbtsai both use default compression levels? Uber also did some benchmark on their data but it's two years ago |
@manuzhang we use the default zstd compression level, 3. |
Let's discuss this change during the community sync tomorrow. |
This has been added by @aokolnychyi in #8593 🥳 |
In memory of @kbendick as he was making zstd parquet as default at Apple. He conducted the extensive benchmarking and internal testing; his valuable findings recommending to adopt zstd parquet as default.
This PR modifies the default Iceberg parquet compression codec from gzip to zstd.
Currently, Iceberg employs gzip compression as the default option. However, based on our benchmark results, we have found that zstd-compressed parquet files consistently exhibit faster compression and decompression speeds compared to gzip parquet files. Additionally, zstd parquet files are generally slightly smaller in size than their gzip counterparts. As a result of these findings, Trino has already made the switch from gzip to zstd as its Iceberg parquet codec in trinodb/trino#10045