Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #867 from zalando/ARUHA-1634
Browse files Browse the repository at this point in the history
ARUHA-1634 Add detailed log message in case if hake uncommitted timeout happen
  • Loading branch information
antban authored May 3, 2018
2 parents 7b8a227 + cb99200 commit 1446829
Showing 1 changed file with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class StreamingState extends State {
private Map<EventTypePartition, NakadiCursor> uncommittedOffsets;
private Closeable cursorResetSubscription;
private IdleStreamWatcher idleStreamWatcher;
private boolean commitTimeoutReached = false;

/**
* Time that is used for commit timeout check. Commit timeout check is working only in case when there is something
Expand Down Expand Up @@ -129,6 +130,7 @@ private void checkCommitTimeout() {
final long millisFromLastCommit = currentMillis - lastCommitMillis;
if (millisFromLastCommit >= getParameters().commitTimeoutMillis) {
final String debugMessage = "Commit timeout reached";
this.commitTimeoutReached = true;
sendMetadata(debugMessage);
shutdownGracefully(debugMessage);
} else {
Expand Down Expand Up @@ -339,14 +341,51 @@ private void flushData(final EventTypePartition pk, final List<ConsumedEvent> da
}
}

public void logExtendedCommitInformation() {
// We need to log situation when commit timeout was reached, and check that current committed offset is the
// same as it is in zk.
if (!commitTimeoutReached) {
return;
}
final List<EventTypePartition> toCheck = offsets.entrySet().stream()
.filter(e -> !e.getValue().isCommitted())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
try {
final Map<EventTypePartition, SubscriptionCursorWithoutToken> realCommitted = getZk().getOffsets(toCheck);
final List<EventTypePartition> bustedPartitions = offsets.entrySet().stream()
.filter(v -> realCommitted.containsKey(v.getKey()))
.filter(v -> {
final SubscriptionCursorWithoutToken remembered =
getContext().getCursorConverter().convertToNoToken(v.getValue().getCommitOffset());
final SubscriptionCursorWithoutToken real = realCommitted.get(v.getKey());
return real.getOffset().compareTo(remembered.getOffset())> 0;
})
.map(Map.Entry::getKey)
.collect(Collectors.toList());
if (!bustedPartitions.isEmpty()) {
final String bustedData = bustedPartitions.stream().map(etp -> {
final PartitionData pd = offsets.get(etp);
return "(ETP: " + etp +
", StreamCommitted: " + pd.getCommitOffset() +
", StreamSent: " + pd.getSentOffset() +
", ZkCommitted: " + realCommitted.get(etp) + ")";
}).collect(Collectors.joining(", "));
getLog().warn("Stale offsets during streaming commit timeout: {}", bustedData);
}
} catch (NakadiRuntimeException ex) {
getLog().warn("Failed to get nakadi cursors for logging purposes.");
}
}

@Override
public void onExit() {
uncommittedOffsets = offsets.entrySet().stream()
.filter(e -> !e.getValue().isCommitted())
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getSentOffset()));

getContext().getSubscription().getEventTypes().stream().forEach(et -> publishKpi(et));

logExtendedCommitInformation();
if (null != topologyChangeSubscription) {
try {
topologyChangeSubscription.close();
Expand Down

0 comments on commit 1446829

Please sign in to comment.