Skip to content

Commit

Permalink
fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sourav Maji committed Dec 12, 2024
1 parent 82e037a commit a4382cb
Showing 1 changed file with 55 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1405,70 +1405,66 @@ private void processIngestionException() {
/**
* Special handling for current version when encountering {@link MemoryLimitExhaustedException}.
*/
if (isCurrentVersion.getAsBoolean()) {
if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
if (isCurrentVersion.getAsBoolean()
&& ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
/**
* Pause topic consumption to avoid more damage.
* We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task
* will end. Even later on, there are avaiable memory space, we can't resume the ingestion task.
*/
pauseConsumption(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber());
LOGGER.info(
"Memory limit reached. Pausing consumption of topic-partition: {}",
Utils.getReplicaId(
pubSubTopicPartition.getPubSubTopic().getName(),
pubSubTopicPartition.getPartitionNumber()));
runnableForKillIngestionTasksForNonCurrentVersions.run();
if (storageEngine.hasMemorySpaceLeft()) {
unSubscribePartition(pubSubTopicPartition, false);
/**
* Pause topic consumption to avoid more damage.
* We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task
* will end. Even later on, there are avaiable memory space, we can't resume the ingestion task.
* DaVinci ingestion hits memory limit and we would like to retry it in the following way:
* 1. Kill the ingestion tasks for non-current versions.
* 2. Reopen the database since the current database in a bad state, where it can't write or sync even
* there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced
* memtable unfortunately.
* 3. Resubscribe the affected partition.
*/
pauseConsumption(
pubSubTopicPartition.getPubSubTopic().getName(),
pubSubTopicPartition.getPartitionNumber());
LOGGER.info(
"Memory limit reached. Pausing consumption of topic-partition: {}",
Utils.getReplicaId(
pubSubTopicPartition.getPubSubTopic().getName(),
pubSubTopicPartition.getPartitionNumber()));
runnableForKillIngestionTasksForNonCurrentVersions.run();
if (storageEngine.hasMemorySpaceLeft()) {
unSubscribePartition(pubSubTopicPartition, false);
/**
* DaVinci ingestion hits memory limit and we would like to retry it in the following way:
* 1. Kill the ingestion tasks for non-current versions.
* 2. Reopen the database since the current database in a bad state, where it can't write or sync even
* there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced
* memtable unfortunately.
* 3. Resubscribe the affected partition.
*/
LOGGER.info(
"Ingestion for topic-partition: {} can resume since more space has been reclaimed.",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition));
storageEngine.reopenStoragePartition(exceptionPartition);
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, false);
}
} else {
if (resetErrorReplicaEnabled && !isDaVinciClient) {
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition)));
LOGGER.error(
"Marking current version replica status to ERROR for replica: {}",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
}
"Ingestion for topic-partition: {} can resume since more space has been reclaimed.",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition));
storageEngine.reopenStoragePartition(exceptionPartition);
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, false);
}
} else if (isCurrentVersion.getAsBoolean() && resetErrorReplicaEnabled && !isDaVinciClient) {
// marking its replica status ERROR which will later be reset by the controller
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition)));
LOGGER.error(
"Marking current version replica status to ERROR for replica: {}",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
// No need to reset again, clearing out the exception.
partitionIngestionExceptionList.set(exceptionPartition, null);
} else if (!partitionConsumptionState.isCompletionReported()) {
reportError(partitionException.getMessage(), exceptionPartition, partitionException);
} else {
if (!partitionConsumptionState.isCompletionReported()) {
reportError(partitionException.getMessage(), exceptionPartition, partitionException);
} else {
LOGGER.error(
"Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
}
// Unsubscribe the partition to avoid more damages.
if (partitionConsumptionStateMap.containsKey(exceptionPartition)) {
// This is not an unsubscribe action from Helix
unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false);
}
LOGGER.error(
"Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
}
// Unsubscribe the partition to avoid more damages.
if (partitionConsumptionStateMap.containsKey(exceptionPartition)) {
// This is not an unsubscribe action from Helix
unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false);
}
}
});
Expand Down

0 comments on commit a4382cb

Please sign in to comment.