Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[fix] Recycle fetchRequest records when resultFuture already complete (
Browse files Browse the repository at this point in the history
…#1618)

### Motivation

There is currently a memory leak in the `handleFetchRequest` logic. This
PR fixes that leak.

Steps to observe a memory leak:

1. Connect consumer.
2. Send fetch request to broker, which creates a callback:
https://github.com/streamnative/kop/blob/66efad61c853aa9044fd4b03e7bffc3823a36042/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java#L243
3. Close connection to broker before the callback completes, which
triggers the logic to complete all uncompleted futures:
https://github.com/streamnative/kop/blob/66efad61c853aa9044fd4b03e7bffc3823a36042/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java#L113-L120

Credit to @eolivelli for identifying that the close logic was leading to
a memory leak.

Note: this code contains the only reference to
`ResponseCallbackWrapper`, which indicates that there are no other leaks
with this same pattern currently present in the `master` code base.

### Modifications

* Add logic that releases retained `ByteBuffers` in the event that the
`resultFuture` is already completed.
  • Loading branch information
michaeljmarshall authored Dec 13, 2022
1 parent 6653115 commit 10d1d92
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1648,15 +1648,21 @@ protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
partitions.put(tp, data.toPartitionData());
});
partitions.putAll(erroneous);
resultFuture.complete(new ResponseCallbackWrapper(new FetchResponse<>(
Errors.NONE,
partitions,
THROTTLE_TIME_MS,
request.metadata().sessionId()), () ->
resultMap.forEach((__, readRecordsResult) -> {
boolean triggeredCompletion = resultFuture.complete(new ResponseCallbackWrapper(
new FetchResponse<>(
Errors.NONE,
partitions,
THROTTLE_TIME_MS,
request.metadata().sessionId()),
() -> resultMap.forEach((__, readRecordsResult) -> {
readRecordsResult.recycle();
})
));
if (!triggeredCompletion) {
resultMap.forEach((__, readRecordsResult) -> {
readRecordsResult.recycle();
});
}
context.recycle();
});
}
Expand Down

0 comments on commit 10d1d92

Please sign in to comment.