-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data: Add a util to read write partition stats #10176
Conversation
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class GeneratePartitionStats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg-core
module cannot write stats as it doesn't depend on parquet code.
So, iceberg-data
module is introducing an API to write it and core module just registers it to the table.
|
||
public static CloseableIterable<Record> readPartitionStatsFile( | ||
Schema schema, InputFile inputFile) { | ||
// TODO: support other formats or introduce GenericFileReader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have GenericFileReader
, all reading is based on tableScan currently. For this we may need GenericFileReader
similar to the writer we used
Types.NestedField.required(3, Column.DATA_RECORD_COUNT.name(), Types.LongType.get()), | ||
Types.NestedField.required(4, Column.DATA_FILE_COUNT.name(), Types.IntegerType.get()), | ||
Types.NestedField.required(5, Column.DATA_FILE_SIZE_IN_BYTES.name(), Types.LongType.get()), | ||
Types.NestedField.optional( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
observe optional and required fields, as per spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: We made these counts nullable cause we think not all implementations will populate these values? We will still write 0 if needed in the current implementation, though? Except the total record count, which may be expensive to compute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
last computed snapshot may be unavailable at the time of computing the stats (due to expire snapshots). So, LAST_UPDATED_SNAPSHOT_ID
and LAST_UPDATED_AT
can be null. Hence, it is optional. Also, it doesn't make sense to fill it with 0 as snapshot id 0 is not meaningful in iceberg.
To maintain uniform behaviour of all optional fields, the stats like TOTAL_RECORD_COUNT, EQUALITY_DELETE_FILE_COUNT, EQUALITY_DELETE_RECORD_COUNT, POSITION_DELETE_FILE_COUNT, POSITION_DELETE_RECORD_COUNT
will be made null when absent instead of 0.
I have added a nice testcase which validates all these now.
Also, schema(optional & required) is as spec: https://iceberg.apache.org/spec/#partition-statistics-file
If we decide to change optional to required now, it has to go through vote :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand why we mark LAST_UPDATED_SNAPSHOT_ID
and LAST_UPDATED_AT
as optional. My question was about counts. In my view, null means unknown and 0 means absent. If I read a partition stats file and see null as the number of position deletes, it won't be informative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to keep fields optional but I would expect implementations to write 0 for the position delete record count if a partition has no deletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
record.set(Column.SPEC_ID.ordinal(), entry.file().specId()); | ||
|
||
Snapshot snapshot = table.snapshot(entry.snapshotId()); | ||
if (snapshot != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the snapshot is expired, last updated time and last updated snapshot will be null.
"Unsupported file content type: " + entry.file().content()); | ||
} | ||
|
||
// TODO: optionally compute TOTAL_RECORD_COUNT based on the flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the idea of this field is to apply the equality or position delete filters and give the effective results. Need to add a logic to compute it in the follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be tricky, not sure we can actually do it without actually scanning rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. But we decided we need to have this optional field during spec design.
Plan is to have a follow up PR with a flag that enables computing this. By default it won't be computed as it is an expensive operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we would want to extend this path to compute deletes. We may also reconsider how position deletes are handled in V3, so it may be possible to compute this without scanning the data.
I'd probably drop this TODO for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True. Maybe we didn't think well about this field during spec design. We can drop a todo (add a note instead) and make this field deprecated if required later on.
data/src/main/java/org/apache/iceberg/data/PartitionStatsWriterUtil.java
Outdated
Show resolved
Hide resolved
ping @aokolnychyi |
cc: @RussellSpitzer, @szehon-ho : Can you please take a look? I need reviews and I want to take it forward. |
@RussellSpitzer, @szehon-ho: It seems Anton is on holiday for 2-3 weeks. Is it possible for any of you to support for review? cc: @jbonofre |
} | ||
|
||
public static void updateRecord(Record toUpdate, Record fromRecord) { | ||
toUpdate.set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we allow merging records with different spec IDs? Suppose we start with Spec1: {Field#1}
. And we write a partition Filed#1 = 1
. Then we change spec to Spec2: {Field#1, Field#2}
, and write another partition Field#1 = 1 / Field#2 = null
. Will we only have one entry in the partition stats file with Partition_Data: {Field#1 = 1 / Field#2 = null}, Spec_ID: 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. please refer to the last point of #9437 (comment)
Also, it is similar to how the partitions metadata table implemented currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. but how can such a stats entry be useful? From a user's perspective, there's no way to tell if an entry is merged. Suppose the entry says we have 10 records for partition {Field#1 = 1 / Field#2 = null}
. This doesn't mean there're 10 records satisfying filter Field#1 = 1 and Field#2 is null
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us go with the example,
partitioned with col1
and inserted 5 rows for that partition col1 = x
.
now the spec has evolved with partition as col1/col2
and inserted 3 rows for col1 = x
and col2 = y
and inserted 7 rows for col1 = x
and col2 = null
So, when the query is run with col1 = x
and col2 = null
, it scan 12 (5+7) rows. So, the partition stats will also have stats from 12 rows for col1 = x
and col2 = null
(unified tuple). So, I think it is fine.
Can you elaborate why do you think it is a problem to have unified tuple?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're planning to use partition level stats to accelerate queries fetching partition metadata. I think the unified tuple can be confusing for users. For example, if the query says there're 12 rows in partition col1 = x/col2 = null
, a user cannot tell whether we inserted 12 rows into that partition with spec2, or we inserted 5 and 7 rows with spec1 and spec2 respectively.
So maybe my question should be whether the partition stats are only intended for query optimizers, and not suitable for our use case? I thought it's user-facing because it has information such as last_updated_at
, which optimizers usually don't care about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe my question should be whether the partition stats are only intended for query optimizers, and not suitable for our use case
It was designed mainly thinking query planners in mind. The spec was approved for unified tuple. The reason is query planner has to scan both the folders in this case [(c1 = 1), (c1 = 1 and c2 = null)], So the unified tuple include stats from both the spec. keeping stats per individual spec might slowdown the planning time to compute the unified stats.
the stats like file count and record count is mainly used for CBO and last modified time and snapshot info added later to decide whether to run compaction or not for the users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ajantha-bhat Thanks for clarifying! Personally I still think this feature can be used to improve user experience for investigating partitions. Maybe we just need to explain to users about the ambiguity for partitions with null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lirui-apache: You were right that we shouldn't combine the stats for different spec. Just that we didn't had a strong usecase.
Anton mentioned that
If we combine delete stats across specs, they will not be accurate. A delete file from one spec never applies to a data file in another spec.
So, now we are not combining it.
more details on #11146
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, thanks for the pointer
data/src/main/java/org/apache/iceberg/data/GeneratePartitionStats.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/GeneratePartitionStats.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/TestComputePartitionStats.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/TestPartitionStatsWriterUtil.java
Outdated
Show resolved
Hide resolved
private static final int PARTITION_COUNT = 10000; | ||
|
||
// one data file manifest | ||
private static final int DATA_FILES_PER_PARTITION_COUNT = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this would be very unrealistic for benchmarking? Shouldn't we have multiple data files in each manifest (also multiple partitions in the same manifest?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated now.
2 million data file entires.
100 partitions * 20 data files per partitions = 2k data files per manifest.
10K manifests.
It took around 10 seconds to write, commit and read the partition stats file.
data/src/jmh/java/org/apache/iceberg/PartitionStatsGeneratorBenchmark.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/PartitionStatsGenerator.java
Outdated
Show resolved
Hide resolved
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class PartitionStatsGenerator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we doc this and Writer Util? I'm mostly interested in the separation of these two classes and what should their responsibilities be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the javadocs.
PartitionStatsGenerator
is to generate the stats file using util classes. Util class is a helper for that.
Thanks @RussellSpitzer for the review. I have addressed the comments. |
bbe1940
to
827cb20
Compare
I took a detailed look at the implementation/spec today. Here are my high-level points:
I'd consider restructuring the code as below (names are suggestions):
What do you think, @ajantha-bhat? |
00fe16e
to
521c2d7
Compare
import org.apache.iceberg.types.Types.StructType; | ||
|
||
public class PartitionStatsRecord implements Record, StructLike { | ||
private static final LoadingCache<StructType, Map<String, Integer>> NAME_MAP_CACHE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to existing GenericRecord
class, but it wraps the existing PartitionStats
and used only for partition stats
521c2d7
to
0d2be51
Compare
0d2be51
to
38942cd
Compare
Let me take a look hopefully over weekend or on Monday. |
Okay, I played around with this a bit over the weekend. I suggest we split this PR into two parts: While we look into the writing part, it would be nice to get
|
Could you please point me to that work? |
Based on the conclusion from #9437, we are going with the local implementation with multi threads instead of distributive algorithm.
iceberg-core
module cannot write stats as it doesn't depend on parquet code.So,
iceberg-data
module is introducing an API to write it and core module just registers it to the table.Syntax look like this.