Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to modify the filename of the S3 object? #154

Open
matias-dls opened this issue Oct 7, 2020 · 2 comments
Open

How to modify the filename of the S3 object? #154

matias-dls opened this issue Oct 7, 2020 · 2 comments

Comments

@matias-dls
Copy link

Hi, I've been using the S3 connector for a couple of weeks now, and I want to change the way the connector names each file. I am using the HourlyBasedPartition, so the path to each file is already enough for me to find each file, and I want the filenames to be something generic for all the files, like just 'Data.json.gzip' (with the respective path from the partitioner).

For example, I want to go from this:
<prefix>/<topic>/<HourlyBasedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

To this:
<prefix>/<topic>/<HourlyBasedPartition>/Data.<format>

The objective of this is to only make one call to S3 to download the files later, instead of having to look for the filename first and then download it.

@matias-dls
Copy link
Author

Searching through the files from the repo called 'kafka-connect-s3', I found this file:
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java which at the end has some of the following functions:

private RecordWriter getWriter(SinkRecord record, String encodedPartition)
      throws ConnectException {
    if (writers.containsKey(encodedPartition)) {
      return writers.get(encodedPartition);
    }
    String commitFilename = getCommitFilename(encodedPartition);
    log.debug(
        "Creating new writer encodedPartition='{}' filename='{}'",
        encodedPartition,
        commitFilename
    );
    RecordWriter writer = writerProvider.getRecordWriter(connectorConfig, commitFilename);
    writers.put(encodedPartition, writer);
    return writer;
  }

  private String getCommitFilename(String encodedPartition) {
    String commitFile;
    if (commitFiles.containsKey(encodedPartition)) {
      commitFile = commitFiles.get(encodedPartition);
    } else {
      long startOffset = startOffsets.get(encodedPartition);
      String prefix = getDirectoryPrefix(encodedPartition);
      commitFile = fileKeyToCommit(prefix, startOffset);
      commitFiles.put(encodedPartition, commitFile);
    }
    return commitFile;
  }

  private String fileKey(String topicsPrefix, String keyPrefix, String name) {
    String suffix = keyPrefix + dirDelim + name;
    return StringUtils.isNotBlank(topicsPrefix)
           ? topicsPrefix + dirDelim + suffix
           : suffix;
  }

  private String fileKeyToCommit(String dirPrefix, long startOffset) {
    String name = tp.topic()
                      + fileDelim
                      + tp.partition()
                      + fileDelim
                      + String.format(zeroPadOffsetFormat, startOffset)
                      + extension;
    return fileKey(topicsDir, dirPrefix, name);
  }

I don't know if this can be customized to what I want to do but seems to be somehow near/related to my intentions. Hope it helps.

@OneCricketeer
Copy link

Worth pointing out that the sink connector does not append into files, so you would be overriding those files if you were to change these lines

+ String name = "Data"
- String name = tp.topic()
-                      + fileDelim
-                      + tp.partition()
-                      + fileDelim
-                      + String.format(zeroPadOffsetFormat, startOffset)
                      + extension;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants