From 0f146cdc1a27c4aec36c2efeae4ce01fb0a18c1b Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 5 Apr 2024 14:24:45 +0200 Subject: [PATCH] always re-create client upon an error --- .../java/io/questdb/kafka/QuestDBSinkTask.java | 14 ++++++++------ .../kafka/QuestDBSinkConnectorEmbeddedTest.java | 4 +++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index bdff4be..8542bb5 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -164,13 +164,13 @@ public void put(Collection collection) { private void onSenderException(Exception e) { if (httpTransport) { + closeSenderSilently(); throw new ConnectException("Failed to send data to QuestDB", e); } batchesSinceLastError = 0; if (--remainingRetries > 0) { closeSenderSilently(); - sender = null; log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs()); context.timeout(config.getRetryBackoffMs()); throw new RetriableException(e); @@ -180,12 +180,14 @@ private void onSenderException(Exception e) { } private void closeSenderSilently() { - try { - if (sender != null) { + if (sender != null) { + try { sender.close(); + } catch (Exception ex) { + log.warn("Failed to close sender", ex); + } finally { + sender = null; } - } catch (Exception ex) { - log.warn("Failed to close sender", ex); } } @@ -445,7 +447,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { public void flush(Map map) { if (httpTransport) { try { - log.info("Flushing data to QuestDB"); + log.debug("Flushing data to QuestDB"); sender.flush(); } catch (LineSenderException | HttpClientException e) { onSenderException(e); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index bbcaddc..3593b84 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -307,7 +307,9 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",49\r\n", "select firstname,lastname,age from " + topicName + " where age = 49", - httpPort); + 20, + httpPort + ); } @ParameterizedTest