Skip to content

Commit

Permalink
KAFKA-17871: avoid blocking the herder thread when producer flushing …
Browse files Browse the repository at this point in the history
…hangs

The call to `backingStore.get()` (called by connector task threads through
`OffsetStorageReaderImpl.offsets()`) can block for long time waiting for data flush to complete
(`KafkaProducer.flush()`).

This change moves that call outside the synchronized clause that holds `offsetReadFutures`,
so that if `backingStore.get()` hangs then it does not keep `offsetReadFutures` locked.
The access to `closed` flag (`closed.get()`) is kept inside the synchronize clause to avoid race
condition with `close()`.

This is important because `OffsetStorageReaderImpl.close()` needs to lock `offsetReadFutures` as
well in order to cancel the futures.
Since the herder thread calls `OffsetStorageReaderImpl.close()` when attempting to stops a task,
before this change this was resulting in the herder thread hanging indefinetely waiting for
`backingStore.get()` to complete.
  • Loading branch information
davide-armand committed Dec 15, 2024
1 parent 4d25c54 commit 896e467
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,18 @@ public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<Strin
Map<ByteBuffer, ByteBuffer> raw;
try {
Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;

// Note: this call can block for long time waiting for data flush to complete (`KafkaProducer.flush()`).
offsetReadFuture = backingStore.get(serializedToOriginal.keySet());

synchronized (offsetReadFutures) {
if (closed.get()) {
offsetReadFuture.cancel(true);
throw new ConnectException(
"Offset reader is closed. This is likely because the task has already been "
+ "scheduled to stop but has taken longer than the graceful shutdown "
+ "period to do so.");
}
offsetReadFuture = backingStore.get(serializedToOriginal.keySet());
offsetReadFutures.add(offsetReadFuture);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class OffsetStorageReaderTest {

@Mock
private Converter taskKeyConverter;

@Mock
private Converter taskValueConverter;

@Mock
private OffsetBackingStore offsetBackingStore;

@Test(timeout = 60 * 1000)
public void testClosingOffsetReaderWhenOffsetStoreHangs() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(
offsetBackingStore, "namespace", taskKeyConverter, taskValueConverter);

doAnswer(invocation -> {
// Sleep for a long time to simulate a hanging offset store
Thread.sleep(9999 * 1000);
throw new RuntimeException("Should never get here");
}).when(offsetBackingStore).get(any());

// Connector task thread hanging
executor.submit(() -> {
// Does call offsetBackingStore.get() and hangs
offsetStorageReaderImpl.offsets(Collections.emptyList());
});
Thread.sleep(3000);

verify(offsetBackingStore, times(1)).get(any());

// The herder thread should not block when trying to close `offsetStorageReaderImpl`
// and complete before test timeout
offsetStorageReaderImpl.close();
}

@Test(timeout = 60 * 1000)
public void testClosingOffsetReaderWhenOffsetStoreHangsAndHasIncompleteFutures() throws Exception {
// Test similar to `testClosingOffsetReaderWhenOffsetStoreHangs` above, but in this case
// `OffsetStorageReaderImpl.offsetReadFutures` contains a future when `offsetStorageReaderImpl.close()` is called.

ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<?> hangingFuture = mock(CompletableFuture.class);

OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(
offsetBackingStore, "namespace", taskKeyConverter, taskValueConverter);

// Mock hanging future
doAnswer(invocation -> {
Thread.sleep(9999 * 1000);
throw new RuntimeException("Should never get here");
}).when(hangingFuture).get();

// Mock `offsetBackingStore.get()`
doAnswer(new Answer<Object>() {
int callCount = 0;

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
if (callCount == 0) {
callCount += 1;
// First connector task
return hangingFuture;
} else {
// Second connector task
Thread.sleep(9999 * 1000);
throw new RuntimeException("Should never get here");
}
}
}
).when(offsetBackingStore).get(any());


// Connector task thread calls `offsets()` --> hangs on `hangingFuture.get()`
// --> the future is added to `offsetStorageReaderImpl.offsetReadFutures` and never removed
executor.submit(() -> {
offsetStorageReaderImpl.offsets(Collections.emptyList());
});
Thread.sleep(3000);

verify(offsetBackingStore, times(1)).get(any());
verify(hangingFuture, times(1)).get();

// Another connector task thread calls `offsets()` --> hangs on offsetBackingStore.get()
// --> the future is never added to `offsetStorageReaderImpl.offsetReadFutures`
executor.submit(() -> {
offsetStorageReaderImpl.offsets(Collections.emptyList());
});
Thread.sleep(3000);

verify(offsetBackingStore, times(2)).get(any());

// The herder thread should not block when trying to close `offsetStorageReaderImpl` and should complete
// before the test timeout
offsetStorageReaderImpl.close();

// The hanging future should be cancelled by `close()`
verify(hangingFuture, times(1)).cancel(true);
}
}

0 comments on commit 896e467

Please sign in to comment.