diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 99b9afcb..57eb8ebe 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -267,6 +267,7 @@ public void Dispose() _options.Log.DebugFormat("Consumer: Disposing..."); _disposeToken.Cancel(); + _fetchResponseQueue.CompleteAdding(); //wait for all threads to unwind foreach (var task in _partitionPollingIndex.Values.Where(task => task != null)) diff --git a/src/kafka-tests/Unit/ConsumerTests.cs b/src/kafka-tests/Unit/ConsumerTests.cs index 9ee33fff..7582f683 100644 --- a/src/kafka-tests/Unit/ConsumerTests.cs +++ b/src/kafka-tests/Unit/ConsumerTests.cs @@ -45,6 +45,28 @@ public void CancellationShouldInterruptConsumption() } } + [Test] + public void DisposeShouldInterruptConsumption() + { + var routerProxy = new BrokerRouterProxy(new MoqMockingKernel()); + routerProxy.BrokerConn0.FetchResponseFunction = () => { return new FetchResponse(); }; + + var router = routerProxy.Create(); + + var options = CreateOptions(router); + + Task consumeTask; + using (var consumer = new Consumer(options)) + { + consumeTask = Task.Run(() => consumer.Consume().FirstOrDefault()); + + //wait until the fake broker is running and requesting fetches + TaskTest.WaitFor(() => routerProxy.BrokerConn0.FetchRequestCallCount > 10); + } + + Assert.True(consumeTask.Wait(TimeSpan.FromSeconds(1))); + } + [Test] public void ConsumerWhitelistShouldOnlyConsumeSpecifiedPartition() {