diff --git a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 25c0a4bab7..66f7090160 100644 --- a/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -63,6 +63,7 @@ class StreamingState extends State { private Map uncommittedOffsets; private Closeable cursorResetSubscription; private IdleStreamWatcher idleStreamWatcher; + private boolean commitTimeoutReached = false; /** * Time that is used for commit timeout check. Commit timeout check is working only in case when there is something @@ -129,6 +130,7 @@ private void checkCommitTimeout() { final long millisFromLastCommit = currentMillis - lastCommitMillis; if (millisFromLastCommit >= getParameters().commitTimeoutMillis) { final String debugMessage = "Commit timeout reached"; + this.commitTimeoutReached = true; sendMetadata(debugMessage); shutdownGracefully(debugMessage); } else { @@ -339,6 +341,43 @@ private void flushData(final EventTypePartition pk, final List da } } + public void logExtendedCommitInformation() { + // We need to log situation when commit timeout was reached, and check that current committed offset is the + // same as it is in zk. + if (!commitTimeoutReached) { + return; + } + final List toCheck = offsets.entrySet().stream() + .filter(e -> !e.getValue().isCommitted()) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + try { + final Map realCommitted = getZk().getOffsets(toCheck); + final List bustedPartitions = offsets.entrySet().stream() + .filter(v -> realCommitted.containsKey(v.getKey())) + .filter(v -> { + final SubscriptionCursorWithoutToken remembered = + getContext().getCursorConverter().convertToNoToken(v.getValue().getCommitOffset()); + final SubscriptionCursorWithoutToken real = realCommitted.get(v.getKey()); + return real.getOffset().compareTo(remembered.getOffset())> 0; + }) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (!bustedPartitions.isEmpty()) { + final String bustedData = bustedPartitions.stream().map(etp -> { + final PartitionData pd = offsets.get(etp); + return "(ETP: " + etp + + ", StreamCommitted: " + pd.getCommitOffset() + + ", StreamSent: " + pd.getSentOffset() + + ", ZkCommitted: " + realCommitted.get(etp) + ")"; + }).collect(Collectors.joining(", ")); + getLog().warn("Stale offsets during streaming commit timeout: {}", bustedData); + } + } catch (NakadiRuntimeException ex) { + getLog().warn("Failed to get nakadi cursors for logging purposes."); + } + } + @Override public void onExit() { uncommittedOffsets = offsets.entrySet().stream() @@ -346,7 +385,7 @@ public void onExit() { .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSentOffset())); getContext().getSubscription().getEventTypes().stream().forEach(et -> publishKpi(et)); - + logExtendedCommitInformation(); if (null != topologyChangeSubscription) { try { topologyChangeSubscription.close();