-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka Connect: Initial project setup and event data structures #8701
Conversation
...ka-connect-events/src/main/java/io/tabular/iceberg/connect/events/CommitCompletePayload.java
Outdated
Show resolved
Hide resolved
...nect/kafka-connect-events/src/test/java/org/apache/iceberg/connect/events/EventTestUtil.java
Outdated
Show resolved
Hide resolved
Very happy to see this contribution 👍 I just skimmed the files. Will definitely go through design docs and review again. |
f6ffd07
to
a1354cc
Compare
a1354cc
to
ef0aa7f
Compare
kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Element.java
Outdated
Show resolved
Hide resolved
@@ -200,3 +202,6 @@ if (JavaVersion.current() == JavaVersion.VERSION_1_8) { | |||
} | |||
} | |||
|
|||
include ":iceberg-kafka-connect:kafka-connect-events" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark/Flink/Hive require that we support multiple versions. Is the KC api stable enough that we don't have to worry about supporting different major/minor versions? I see the 3.x
line goes back to 2021, but there are six minor releases. Just wondering if we should structure the project with versioning in mind from the start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the most part, the API has been very stable so I was thinking of not doing this to start, it might be overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need so many modules? If we're just creating a single Jar in the end I wonder if it is helpful to break it up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason I have events as separate module is so that it can be used independently from the sink to read and deserialize events from the control topic. This can be used, for example, to trigger workflows from the control topic rather than having to poll the table metadata. With that, the plan was to have 2 modules (events, core) plus a runtime module.
private Long vtts; | ||
private final Schema avroSchema; | ||
|
||
private static final Schema AVRO_SCHEMA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically prefer to define Iceberg schemas with field IDs and convert them to Avro schemas. I think those schemas are easier to read and it ensures that we assign field IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for using the Iceberg schema definition with field ids and converting them to Avro.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree it is more readable. However, this will require changes to the Avro encoder. For example, both GenericDataFile and GenericDelete file use the same Iceberg schema. The current schema converter only allows a one to one mapping of struct type to class name, so you can't include both data files and delete files in the same event.
Also, it becomes cumbersome to redefine the same struct to class mapping for every schema, e.g. the TopicPartitionOffset mapping must be defined for both payloads that use it as well as for the event container, when converting the schema to Avro.
Another downside is the extra overhead of doing the conversion. Schemas are sometimes constructed for each event, as the schema changes depending on the table and partitioning. Though that is more minor and could be solved w/ caching.
I did updated the field IDs so they aren't all -1.
import org.apache.avro.specific.SpecificData.SchemaConstructable; | ||
import org.apache.iceberg.avro.AvroSchemaUtil; | ||
|
||
public interface Element extends IndexedRecord, SchemaConstructable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using SchemaConstructable
, Iceberg just checks for a Schema
constructor first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I removed SchemaConstructable
.
I'm developing |
public class EventTestUtil { | ||
public static DataFile createDataFile() { | ||
Ctor<DataFile> ctor = | ||
DynConstructors.builder(DataFile.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this use DataFiles
to build a data file instead of using reflection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done
return commitId; | ||
} | ||
|
||
public Long vtts() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a comment explaining this field? I can understand it is some time-stamp. But not clearly with the abbreviation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also add events.md doc with this PR now.
https://github.com/tabular-io/iceberg-kafka-connect/blob/main/docs/events.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add this info as javadoc
VTTS (valid-through timestamp) property indicating through what timestamp records have been fully processed, i.e. all records processed from then on will have a timestamp greater than the VTTS. This is calculated by taking the maximum timestamp of records processed from each topic partition, and taking the minimum of these. If any partitions were not processed as part of the commit then the VTTS is not set
https://github.com/tabular-io/iceberg-kafka-connect/blob/main/docs/design.md#snapshot-properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a javadoc for this
...-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/TableName.java
Outdated
Show resolved
Hide resolved
import org.apache.iceberg.types.Types.StructType; | ||
import org.junit.jupiter.api.Test; | ||
|
||
public class EventSerializationTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Majority of the testcase in Iceberg starts with Test prefix. So, maybe we can rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is pretty mixed actually and I much prefer the specificity of putting the class name first.
ByteBuffer.wrap(new byte[] {0})); | ||
} | ||
|
||
private EventTestUtil() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we move the constructor up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ef0aa7f
to
b4d413d
Compare
|
||
/** | ||
* A control event payload for events sent by a coordinator that indicates it has completed a commit | ||
* cycle. Events with this payload are not consumed by the sink, they * are informational and can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* cycle. Events with this payload are not consumed by the sink, they * are informational and can be | |
* cycle. Events with this payload are not consumed by the sink, they are informational and can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this, I fixed this
public class CommitCompletePayload implements Payload { | ||
|
||
private UUID commitId; | ||
private Long vtts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name is somewhat cryptic, would it make sense to rename this to validThroughTimestamp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I updated this to validThroughTs
kafka-connect/kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/Event.java
Show resolved
Hide resolved
5e711b1
to
8b5d469
Compare
...kafka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitReadyPayload.java
Outdated
Show resolved
Hide resolved
a1daf94
to
ff5d269
Compare
...ka-connect-events/src/main/java/org/apache/iceberg/connect/events/CommitCompletePayload.java
Outdated
Show resolved
Hide resolved
I'm a +1 on moving forward with this. I think there might still be an open question about Iceberg/Avro Schema definitions, but I'm fine with either resolution. |
1a3fde1
to
9fed273
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Can we take this forward?
super((id, struct) -> names.get(struct)); | ||
} | ||
|
||
Map<Type, Schema> getConversionMap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map<Type, Schema> getConversionMap() { | |
Map<Type, Schema> conversionMap() { |
f6e7bbd
to
f33aa77
Compare
Since there are no further comments, I'll go ahead and merge this. I would like to express my gratitude to @bryanck for working on this since this will help so many people in the Kafka community to get their data in Iceberg in a fast and reliable way! 🙏 Thanks @ajantha-bhat, @danielcweeks, @rdblue, @jbonofre, @ajantha-bhat and @nastra for the review 🚀 |
@Fokko awesome, thanks ! |
Awesome! Thanks all for the feedback and guidance. I'll follow up with PRs for the actual sink portion. |
We (Tabular) would like to submit our Iceberg Kafka Connect sink connector to the Iceberg project. Kafka Connect is a popular, efficient, and easy to use framework for reading from and writing to Kafka. This sink gives Iceberg users another option for landing data from Kafka into Iceberg tables. Having the backing of the Iceberg community will help it evolve and improve over time.
The sink codebase is on the larger side, so the thought was to break the submission into different PRs to make it easier to review. This initial PR includes the starting build setup and the project for the Avro event data structures.
The original repo can be found at https://github.com/tabular-io/iceberg-kafka-connect. Some design docs can be found in the
docs
directory, and that includes an explanation of what the events are used for, and why Avro was chosen for serialization.The events were put in a separate project so the library can be used independently to read messages from the control topic outside of the connector, for debugging or notification purposes.