Skip to content

Commit

Permalink
Post-Processing for Datalake Cloud Source Connectors (#161)
Browse files Browse the repository at this point in the history
After processing files from the S3/GCP Storage source, this enables the feature of deleting or moving the files after they've been committed.

# New KCQL Configuration Options for Datalake Cloud Connectors

The following configuration options introduce post-processing capabilities for the AWS S3, GCP Storage, and (coming soon) Azure Datalake Gen 2 **source connectors**. These options allow the connector to manage source files after they are successfully processed, either by deleting the file or moving it to a new location in cloud storage.

In Kafka Connect, post-processing is triggered when the framework calls the `commitRecord` method after a source record is successfully processed. The configured action then determines how the source file is handled.

If no `post.process.action` is configured, **no post-processing will occur**, and the file will remain in its original location.

---

## KCQL Configuration Options

### 1. `post.process.action`
- **Description**: Defines the action to perform on a file after it has been processed.
- **Options**:
  - `DELETE` – Removes the file after processing.
  - `MOVE` – Relocates the file to a new location after processing.

### 2. `post.process.action.bucket`
- **Description**: Specifies the target bucket for files when using the `MOVE` action.
- **Applicability**: Only applies to the `MOVE` action.
- **Notes**: This field is **mandatory** when `post.process.action` is set to `MOVE`.

### 3. `post.process.action.prefix`
- **Description**: Specifies a new prefix to replace the existing one for the file’s location when using the `MOVE` action. The file's path will remain unchanged except for the prefix.
- **Applicability**: Only applies to the `MOVE` action.
- **Notes**: This field is **mandatory** when `post.process.action` is set to `MOVE`.

---

## Key Use Cases
- **DELETE**: Automatically removes source files to free up storage space and prevent redundant data from remaining in the bucket.
- **MOVE**: Organizes processed source files by relocating them to a different bucket or prefix, which is useful for archiving, categorizing, or preparing files for other workflows.

---

## Examples

### Example 1: Deleting Files After Processing
To configure the source connector to delete files after processing, use the following KCQL:

```kcql
INSERT INTO `my-bucket`
SELECT * FROM `my-topic`
PROPERTIES (
    'post.process.action'=`DELETE`
)
```


### Example 2: Moving Files After Processing
To configure the source connector to move files to a different bucket and prefix, use the following KCQL:

```kcql
INSERT INTO `my-bucket:archive/`
SELECT * FROM `my-topic`
PROPERTIES (
    'post.process.action'=`MOVE`,
    'post.process.action.bucket'=`archive-bucket`,
    'post.process.action.prefix'=`archive/`
)
```

In this example:
* The file is moved to `archive-bucket`.
* The prefix `archive/` is applied to the file’s path while keeping the rest of the path unchanged.

## Important Considerations

* Both `post.process.action.bucket` and `post.process.action.prefix` are mandatory when using the `MOVE` action.
* For the `DELETE` action, no additional configuration is required.
* If no `post.process.action` is configured, no post-processing will be applied, and the file will remain in its original location.




* * Configuration for Burn-After-Reading
* Implementing actions and storage interfaces.  Needs add tests.  The file move logic needs testing where it resolves the path - is this even the best configuration?
* Storage interface tests

* Address comment from review referencing this page on moving items in GCP: https://cloud.google.com/storage/docs/samples/storage-move-file

* * Adding temporary logging, fixing a bug with the Map equality not enabling prefixes to map to each other
* Fix Move action
* Fix prefix replace behaviour

* Changes to ensure error handling approach is correct

* Review fixes - remove S3 references

* Avoid variable shadowing

* Avoid variable shadowing

* add documentation

* CopyObjectResponse
  • Loading branch information
davidsloan authored Nov 29, 2024
1 parent 4a16747 commit c272cd5
Show file tree
Hide file tree
Showing 44 changed files with 1,139 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ class S3SourceTask
S3FileMetadata,
S3SourceConfig,
S3Client,
]
]("/aws-s3-source-ascii.txt")
with LazyLogging {

val validator: CloudLocationValidator = S3LocationValidator

override def createStorageInterface(
connectorTaskId: ConnectorTaskId,
config: S3SourceConfig,
s3Client: S3Client,
client: S3Client,
): AwsS3StorageInterface =
new AwsS3StorageInterface(connectorTaskId = connectorTaskId,
s3Client = s3Client,
s3Client = client,
batchDelete = config.batchDelete,
extensionFilter = config.extensionFilter,
)
Expand All @@ -59,6 +59,6 @@ class S3SourceTask

override def connectorPrefix: String = CONNECTOR_PREFIX

override def createDirectoryLister(connectorTaskId: ConnectorTaskId, s3Client: S3Client): DirectoryLister =
new AwsS3DirectoryLister(connectorTaskId, s3Client)
override def createDirectoryLister(connectorTaskId: ConnectorTaskId, client: S3Client): DirectoryLister =
new AwsS3DirectoryLister(connectorTaskId, client)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.storage.FileCreateError
import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError
import io.lenses.streamreactor.connect.cloud.common.storage.FileListError
import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError
import io.lenses.streamreactor.connect.cloud.common.storage.FileMoveError
import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
import io.lenses.streamreactor.connect.cloud.common.storage.ListOfMetadataResponse
import io.lenses.streamreactor.connect.cloud.common.storage.ListResponse
Expand Down Expand Up @@ -295,4 +296,33 @@ class AwsS3StorageInterface(
* @return
*/
override def system(): String = "S3"

override def mvFile(
oldBucket: String,
oldPath: String,
newBucket: String,
newPath: String,
): Either[FileMoveError, Unit] = {
val headObjectRequest = HeadObjectRequest.builder().bucket(oldBucket).key(oldPath).build()
Try(s3Client.headObject(headObjectRequest)) match {
case Failure(ex: NoSuchKeyException) =>
logger.warn("Object ({}/{}) doesn't exist to move", oldBucket, oldPath, ex)
().asRight
case Failure(ex) =>
logger.error("Object ({}/{}) could not be retrieved", ex)
FileMoveError(ex, oldPath, newPath).asLeft
case Success(_) =>
Try {
s3Client.copyObject(
CopyObjectRequest.builder().sourceKey(oldPath).destinationKey(newPath).sourceBucket(
oldBucket,
).destinationBucket(
newBucket,
).build(),
)
s3Client.deleteObject(DeleteObjectRequest.builder().bucket(oldBucket).key(oldPath).build())
}.toEither.leftMap(FileMoveError(_, oldPath, newPath)).void
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import io.lenses.streamreactor.connect.aws.s3.model.location.S3LocationValidator
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction
import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager
import io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.source.SourceRecord
import org.mockito.MockitoSugar
Expand All @@ -46,12 +48,16 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with
private val bucketAndPrefix = CloudLocation("test", "ing".some)
private val firstFileBucketAndPath = bucketAndPrefix.withPath("test:ing/topic/9/0.json")
private val firstFileBucketAndPathAndLine = firstFileBucketAndPath.atLine(0).withTimestamp(Instant.now)
private val noPostProcessAction = Option.empty[PostProcessAction]
private val storageInterface = mock[StorageInterface[_]]

"poll" should "be empty when no results found" in {
val fileQueueProcessor: SourceFileQueue = mock[SourceFileQueue]

var locationFnCalls = 0
val target = new ReaderManager(
bucketAndPrefix,
bucketAndPrefix,
recordsLimit,
fileQueueProcessor,
_ =>
Expand All @@ -61,6 +67,8 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with
},
connectorTaskId,
Ref[IO].of(Option.empty[ResultReader]).unsafeRunSync(),
storageInterface,
noPostProcessAction,
)

when(fileQueueProcessor.next()).thenReturn(None.asRight)
Expand Down Expand Up @@ -100,6 +108,8 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with
).thenReturn(firstFileBucketAndPath)

val target = new ReaderManager(
bucketAndPrefix,
bucketAndPrefix,
recordsLimit,
fileQueueProcessor,
location => {
Expand All @@ -108,6 +118,8 @@ class ReaderManagerTest extends AnyFlatSpec with MockitoSugar with Matchers with
},
connectorTaskId,
Ref[IO].of(Option.empty[ResultReader]).unsafeRunSync(),
storageInterface,
noPostProcessAction,
)

target.poll().unsafeRunSync() should be(pollResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocationValidator
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions
import io.lenses.streamreactor.connect.cloud.common.source.config.PartitionSearcherOptions.ExcludeIndexes
import io.lenses.streamreactor.connect.cloud.common.source.config.PostProcessAction
import io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher
import io.lenses.streamreactor.connect.cloud.common.source.distribution.PartitionSearcherResponse
import io.lenses.streamreactor.connect.cloud.common.source.files.SourceFileQueue
import io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState
import io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import org.mockito.MockitoSugar
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
Expand All @@ -44,9 +46,12 @@ import scala.concurrent.duration.DurationInt

class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with MockitoSugar {
private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
private val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)
private val root = CloudLocation("bucket", "path".some)
private val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)

private val fFilesLimit: CloudLocation => Either[Throwable, Int] = _ => 1000.asRight
private val storageInterface = mock[StorageInterface[_]]
private val noPostProcessAction = Option.empty[PostProcessAction]

"PartitionDiscovery" should "discover all partitions" in {
val fileQueueProcessor: SourceFileQueue = mock[SourceFileQueue]
Expand Down Expand Up @@ -78,12 +83,19 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit
connectorTaskId,
).find,
(_, _) =>
IO(new ReaderManager(limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
connectorTaskId,
readerRef,
)),
IO(
new ReaderManager(
root,
root,
limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
connectorTaskId,
readerRef,
storageInterface,
noPostProcessAction,
),
),
state,
cancelledRef,
).start
Expand Down Expand Up @@ -146,12 +158,18 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit
connectorTaskId,
).find,
(_, _) =>
IO(new ReaderManager(limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
connectorTaskId,
readerRef,
)),
IO(
new ReaderManager(root,
root,
limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
connectorTaskId,
readerRef,
storageInterface,
noPostProcessAction,
),
),
state,
cancelledRef,
).start
Expand Down Expand Up @@ -206,12 +224,18 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit
connectorTaskId,
).find,
(_, _) =>
IO(new ReaderManager(limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
connectorTaskId,
readerRef,
)),
IO(
new ReaderManager(root,
root,
limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
connectorTaskId,
readerRef,
storageInterface,
noPostProcessAction,
),
),
state,
cancelledRef,
).start
Expand Down Expand Up @@ -271,7 +295,19 @@ class S3PartitionDiscoveryTest extends AnyFlatSpecLike with Matchers with Mockit
(
_,
_,
) => IO(new ReaderManager(limit, fileQueueProcessor, _ => Left(new RuntimeException()), taskId, readerRef)),
) =>
IO(
new ReaderManager(root,
root,
limit,
fileQueueProcessor,
_ => Left(new RuntimeException()),
taskId,
readerRef,
storageInterface,
noPostProcessAction,
),
),
state,
cancelledRef,
).start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ class ReaderManagerBuilderTest extends AsyncFlatSpec with AsyncIOSpec with Match
None,
OrderingType.LastModified,
false,
Option.empty,
)
val taskId = ConnectorTaskId("test", 3, 1)
ReaderManagerBuilder(root, path, si, taskId, contextF, _ => Some(sbo))
.asserting(_ => rootValue shouldBe Some(root.copy(prefix = Some(path))))
val taskId = ConnectorTaskId("test", 3, 1)
val pathLocation = root.withPath(path)
ReaderManagerBuilder(root, pathLocation, si, taskId, contextF, _ => Some(sbo))
.asserting(_ => rootValue shouldBe Some(pathLocation))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.aws.s3.storage

import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.storage.FileMoveError
import org.mockito.ArgumentMatchersSugar
import org.mockito.MockitoSugar
import org.scalatest.EitherValues
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.CopyObjectRequest
import software.amazon.awssdk.services.s3.model.CopyObjectResponse
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse
import software.amazon.awssdk.services.s3.model.HeadObjectRequest
import software.amazon.awssdk.services.s3.model.HeadObjectResponse
import software.amazon.awssdk.services.s3.model.NoSuchKeyException

class AwsS3StorageInterfaceTest
extends AnyFlatSpecLike
with Matchers
with MockitoSugar
with ArgumentMatchersSugar
with EitherValues {

"mvFile" should "move a file from one bucket to another successfully" in {
val s3Client = mock[S3Client]
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)

val copyObjectResponse: CopyObjectResponse = CopyObjectResponse.builder().build()
when(s3Client.copyObject(any[CopyObjectRequest])).thenReturn(copyObjectResponse)
val deleteObjectResponse: DeleteObjectResponse = DeleteObjectResponse.builder().build()
when(s3Client.deleteObject(any[DeleteObjectRequest])).thenReturn(deleteObjectResponse)

val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath")

result shouldBe Right(())
verify(s3Client).copyObject(any[CopyObjectRequest])
verify(s3Client).deleteObject(any[DeleteObjectRequest])
}

it should "return a FileMoveError if copyObject fails" in {
val s3Client = mock[S3Client]
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)

when(s3Client.copyObject(any[CopyObjectRequest])).thenThrow(new RuntimeException("Copy failed"))

val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath")

result.isLeft shouldBe true
result.left.value shouldBe a[FileMoveError]
verify(s3Client).copyObject(any[CopyObjectRequest])
verify(s3Client, never).deleteObject(any[DeleteObjectRequest])
}

it should "return a FileMoveError if deleteObject fails" in {
val s3Client = mock[S3Client]
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)

val headObjectResponse: HeadObjectResponse = HeadObjectResponse.builder().build()
when(s3Client.headObject(any[HeadObjectRequest])).thenReturn(headObjectResponse)
val copyObjectResponse: CopyObjectResponse = CopyObjectResponse.builder().build()
when(s3Client.copyObject(any[CopyObjectRequest])).thenReturn(copyObjectResponse)
when(s3Client.deleteObject(any[DeleteObjectRequest])).thenThrow(new RuntimeException("Delete failed"))

val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath")

result.isLeft shouldBe true
result.left.value shouldBe a[FileMoveError]
verify(s3Client).copyObject(any[CopyObjectRequest])
verify(s3Client).deleteObject(any[DeleteObjectRequest])
}

it should "pass if no source object exists" in {
val s3Client = mock[S3Client]
val storageInterface = new AwsS3StorageInterface(mock[ConnectorTaskId], s3Client, batchDelete = false, None)

when(s3Client.headObject(any[HeadObjectRequest])).thenThrow(NoSuchKeyException.builder().build())
val copyObjectResponse: CopyObjectResponse = CopyObjectResponse.builder().build()
when(s3Client.copyObject(any[CopyObjectRequest])).thenReturn(copyObjectResponse)
when(s3Client.deleteObject(any[DeleteObjectRequest])).thenThrow(new RuntimeException("Delete failed"))

val result = storageInterface.mvFile("oldBucket", "oldPath", "newBucket", "newPath")

result.isRight shouldBe true
verify(s3Client).headObject(any[HeadObjectRequest])
verify(s3Client, never).copyObject(any[CopyObjectRequest])
verify(s3Client, never).deleteObject(any[DeleteObjectRequest])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,14 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
* @return
*/
override def system(): String = "Azure Datalake"

override def mvFile(
oldBucket: String,
oldPath: String,
newBucket: String,
newPath: String,
): Either[FileMoveError, Unit] =
Try(client.getFileSystemClient(oldBucket).getFileClient(oldPath).rename(newBucket, newPath)).toEither.leftMap(
FileMoveError(_, oldPath, newPath),
).void
}
Loading

0 comments on commit c272cd5

Please sign in to comment.