Skip to content

Commit

Permalink
always re-create client upon an error
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Apr 5, 2024
1 parent e3792ce commit 0f146cd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
14 changes: 8 additions & 6 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ public void put(Collection<SinkRecord> collection) {

private void onSenderException(Exception e) {
if (httpTransport) {
closeSenderSilently();
throw new ConnectException("Failed to send data to QuestDB", e);
}

batchesSinceLastError = 0;
if (--remainingRetries > 0) {
closeSenderSilently();
sender = null;
log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs());
context.timeout(config.getRetryBackoffMs());
throw new RetriableException(e);
Expand All @@ -180,12 +180,14 @@ private void onSenderException(Exception e) {
}

private void closeSenderSilently() {
try {
if (sender != null) {
if (sender != null) {
try {
sender.close();
} catch (Exception ex) {
log.warn("Failed to close sender", ex);
} finally {
sender = null;
}
} catch (Exception ex) {
log.warn("Failed to close sender", ex);
}
}

Expand Down Expand Up @@ -445,7 +447,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
if (httpTransport) {
try {
log.info("Flushing data to QuestDB");
log.debug("Flushing data to QuestDB");
sender.flush();
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",49\r\n",
"select firstname,lastname,age from " + topicName + " where age = 49",
httpPort);
20,
httpPort
);
}

@ParameterizedTest
Expand Down

0 comments on commit 0f146cd

Please sign in to comment.