Skip to content

Commit

Permalink
Merge pull request #317 from neos/catchup-speedup
Browse files Browse the repository at this point in the history
Speed up catchUp() and replay()
  • Loading branch information
robertlemke authored Jul 7, 2023
2 parents b447cbb + d972abb commit 53fe558
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions Classes/EventListener/EventListenerInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ public function catchUp(): void
$streamName = $this->eventListener instanceof StreamAwareEventListenerInterface ? $this->eventListener::listensToStream() : StreamName::all();
$eventStream = $this->eventStore->load($streamName, $highestAppliedSequenceNumber + 1);
$appliedEventsCounter = 0;
$sequenceNumber = 0;

foreach ($eventStream as $eventEnvelope) {
$sequenceNumber = $eventEnvelope->getRawEvent()->getSequenceNumber();
Expand All @@ -181,23 +182,29 @@ public function catchUp(): void
break;
}
try {
$this->applyEvent($eventEnvelope);
$eventWasApplied = $this->applyEvent($eventEnvelope);
} catch (EventCouldNotBeAppliedException $exception) {
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
throw $exception;
}
$appliedEventsCounter ++;
$appliedEventsStorage->saveHighestAppliedSequenceNumber($eventEnvelope->getRawEvent()->getSequenceNumber());
if ($this->transactionBatchSize === 1 || $appliedEventsCounter % $this->transactionBatchSize === 0) {
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
$highestAppliedSequenceNumber = $appliedEventsStorage->reserveHighestAppliedEventSequenceNumber();
} else {
$highestAppliedSequenceNumber = $sequenceNumber;
if ($eventWasApplied) {
$appliedEventsCounter ++;
$appliedEventsStorage->saveHighestAppliedSequenceNumber($sequenceNumber);
if ($this->transactionBatchSize === 1 || $appliedEventsCounter % $this->transactionBatchSize === 0) {
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();
$highestAppliedSequenceNumber = $appliedEventsStorage->reserveHighestAppliedEventSequenceNumber();
} else {
$highestAppliedSequenceNumber = $sequenceNumber;
}
}
foreach ($this->progressCallbacks as $callback) {
$callback($eventEnvelope);
}
}

if ($sequenceNumber > 0) {
$appliedEventsStorage->saveHighestAppliedSequenceNumber($sequenceNumber);
}
$appliedEventsStorage->releaseHighestAppliedSequenceNumber();

if ($this->eventListener instanceof AfterCatchUpInterface) {
Expand All @@ -207,19 +214,17 @@ public function catchUp(): void

/**
* @param EventEnvelope $eventEnvelope
* @return bool If the event was applied, false if no matching listener method existed
* @throws EventCouldNotBeAppliedException
*/
private function applyEvent(EventEnvelope $eventEnvelope): void
private function applyEvent(EventEnvelope $eventEnvelope): bool
{
$event = $eventEnvelope->getDomainEvent();
$rawEvent = $eventEnvelope->getRawEvent();
try {
$listenerMethodName = 'when' . (new \ReflectionClass($event))->getShortName();
} catch (\ReflectionException $exception) {
throw new \RuntimeException(sprintf('Could not extract listener method name for listener %s and event %s', \get_class($this->eventListener), \get_class($event)), 1541003718, $exception);
}
$eventClassName = get_class($event);
$listenerMethodName = 'when' . substr($eventClassName, strrpos($eventClassName, '\\') + 1);
if (!method_exists($this->eventListener, $listenerMethodName)) {
return;
return false;
}
if ($this->eventListener instanceof BeforeInvokeInterface) {
$this->eventListener->beforeInvoke($eventEnvelope);
Expand All @@ -232,6 +237,7 @@ private function applyEvent(EventEnvelope $eventEnvelope): void
if ($this->eventListener instanceof AfterInvokeInterface) {
$this->eventListener->afterInvoke($eventEnvelope);
}
return true;
}

/**
Expand Down

0 comments on commit 53fe558

Please sign in to comment.