Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 source connector only following the last partition #1366

Open
warmuuh opened this issue Sep 13, 2024 · 0 comments
Open

S3 source connector only following the last partition #1366

warmuuh opened this issue Sep 13, 2024 · 0 comments

Comments

@warmuuh
Copy link

warmuuh commented Sep 13, 2024

i saw issue #903 as being closed but also, i am still seeing the same behavior:
i have a topic which i write few events to, first event has been written to partition 0. the topic has 6 partitions, this is what is written to the topic (and correctly backed-up to s3 via the s3-sink-connector):

(format: #<partition>#<key>#<timestamp>#<message>)

#1#id4#2024-09-13T10:31:04+02:00#hello world 4
#4#id2#2024-09-13T10:14:51+02:00#hello world
#2#id7#2024-09-13T10:31:05+02:00#hello world 7
#1#id6#2024-09-13T10:31:05+02:00#hello world 6
#4#id14#2024-09-13T10:53:29+02:00#hello world 14
#1#id13#2024-09-13T10:53:29+02:00#hello world 13
#4#id20#2024-09-13T10:53:32+02:00#hello world 20
#2#id19#2024-09-13T10:53:31+02:00#hello world 19
#1#id15#2024-09-13T10:53:29+02:00#hello world 15
#1#id18#2024-09-13T10:53:31+02:00#hello world 18
#0#id1#2024-09-13T09:34:41+02:00#test-event
#0#id5#2024-09-13T10:31:04+02:00#hello world 5
#5#id1#2024-09-13T09:48:11+02:00#test-event
#5#id3#2024-09-13T10:31:03+02:00#hello world 3
#5#id8#2024-09-13T10:31:06+02:00#hello world 8
#5#id9#2024-09-13T10:31:06+02:00#hello world 9
#3#id10#2024-09-13T10:31:07+02:00#hello world 10
#3#id11#2024-09-13T10:53:28+02:00#hello world 11
#3#id16#2024-09-13T10:53:30+02:00#hello world 16
#5#id12#2024-09-13T10:53:28+02:00#hello world 12
#5#id17#2024-09-13T10:53:30+02:00#hello world 17

and this is what is restored:

#5#id1#2024-09-13T09:48:11+02:00#test-event
#5#id3#2024-09-13T10:31:03+02:00#hello world 3
#0#id1#2024-09-12T13:30:37+02:00#test-event
#5#id8#2024-09-13T10:31:06+02:00#hello world 8
#5#id9#2024-09-13T10:31:06+02:00#hello world 9
#5#id12#2024-09-13T10:53:28+02:00#hello world 12
#5#id17#2024-09-13T10:53:30+02:00#hello world 17

you see that there has been one entry written to partition 0. the first one. at that time, there has been no other partition, so this was the "last" partition at that time. after writing more to the topic, the "last" partition was partition 5 then.

also, this can be seen in the internal kafka connect topic for storing the source connector offsets:

["s3-source-connector",{"container":"backup-bucket","prefix":"kafka-test/"}]#{"path":"kafka-test/test-topic-backup/0/000000000000_1726140637726_1726140637726.json","line":"0","ts":"1726140698000"}
["s3-source-connector",{"container":"backup-bucket","prefix":"kafka-test/"}]#{"path":"kafka-test/test-topic-backup/5/000000000000_1726213691895_1726213691895.json","line":"0","ts":"1726215063000"}
["s3-source-connector",{"container":"backup-bucket","prefix":"kafka-test/"}]#{"path":"kafka-test/test-topic-backup/5/000000000001_1726216263714_1726216263714.json","line":"0","ts":"1726216265000"}
["s3-source-connector",{"container":"backup-bucket","prefix":"kafka-test/"}]#{"path":"kafka-test/test-topic-backup/5/000000000003_1726216266104_1726216266560.json","line":"1","ts":"1726216287000"}
["s3-source-connector",{"container":"backup-bucket","prefix":"kafka-test/"}]#{"path":"kafka-test/test-topic-backup/5/000000000004_1726217608683_1726217608683.json","line":"0","ts":"1726217610000"}
["s3-source-connector",{"container":"backup-bucket","prefix":"kafka-test/"}]#{"path":"kafka-test/test-topic-backup/5/000000000005_1726217610907_1726217610907.json","line":"0","ts":"1726217668000"}

you see that it had partition 0 first, but then, it pointed to partition 5 only.
i dont know too much about the details but i was wondering how the source connector is storing the offsets for each partition? shouldnt there be a taskId or partition id in the offset-topic entry key too? the way, it looks seems like something is overwriting the individual partition-data and only the last one survives?

What version of the Stream Reactor are you reporting this issue for?

kafka-connect-aws-s3-assembly-7.4.5.jar

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

kafka 3.8.0

What is the expected behaviour?

source connector to mirror all partitions

What was observed?

it only follows the last partition

What is your Connect cluster configuration (connect-avro-distributed.properties)?

using strimzi, so slightly different format:

    config:
      group.id: connect-test-connect
      ....
      key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
      value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

What is your connector properties configuration (my-connector.properties)?

  class: io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
  tasksMax: 1
  config:
    topics: test-topic-backup
    # 10 second flush.interval for testing
    connect.s3.kcql: >-
      INSERT INTO backup-bucket:kafka-test
      SELECT * FROM test-topic-backup 
      STOREAS `JSON` 
      PROPERTIES ('store.envelope'=true, 'flush.interval'=10)
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    connect.s3.aws.auth.mode: Credentials
    connect.s3.aws.access.key: ...
    connect.s3.aws.secret.key: ...
    connect.s3.aws.region: eu-central-1
 class: io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
  tasksMax: 1
  config:
    connect.s3.kcql: >-
      INSERT INTO test-topic-restore
      SELECT * FROM backup-bucket:kafka-test
      STOREAS `JSON` 
      PROPERTIES ('store.envelope'=true)
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    connect.s3.aws.auth.mode: Credentials
    connect.s3.aws.access.key: ...
    connect.s3.aws.secret.key: ...
    connect.s3.aws.region: eu-central-1
    connect.s3.source.partition.extractor.type: hierarchical
    connect.partition.search.continuous: true
    connect.s3.source.partition.search.interval: 1000 # check every second for tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant