Skip to content

Commit

Permalink
FEATURE: Setup marks failed projections to be booted again
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Nov 24, 2024
1 parent 73e1097 commit 6726d73
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Neos\ContentRepository\Core\Subscription\Engine\Error;
use Neos\ContentRepository\Core\Subscription\Engine\Errors;
use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult;
use Neos\ContentRepository\Core\Subscription\Engine\Result;
use Neos\ContentRepository\Core\Subscription\SubscriptionAndProjectionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionError;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
Expand Down Expand Up @@ -110,10 +109,7 @@ public function fixFailedProjection()
$this->subscriptionStatus('Vendor.Package:FakeProjection')
);

// setup does not change anything
$result = $this->subscriptionService->subscriptionEngine->setup();
self::assertEquals(Result::success(), $result);
// nor catchup active
// catchup active does not change anything
$result = $this->subscriptionService->subscriptionEngine->catchUpActive();
self::assertEquals(ProcessedResult::success(0), $result);
// boot neither
Expand Down Expand Up @@ -143,10 +139,12 @@ public function fixFailedProjection()
public function projectionIsRolledBackAfterError()
{
$this->subscriptionService->setupEventStore();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::exactly(2))->method('setUp');
$this->fakeProjection->expects(self::once())->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
$result = $this->subscriptionService->subscriptionEngine->setup();
self::assertNull($result->errors);
$result = $this->subscriptionService->subscriptionEngine->boot();
self::assertNull($result->errors);

// commit an event
$this->commitExampleContentStreamEvent();
Expand Down Expand Up @@ -180,16 +178,33 @@ public function projectionIsRolledBackAfterError()
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

//
// fix projection and catchup
//

$this->secondFakeProjection->killSaboteur();

// todo find way to retry projection? catchup force?
$result = $this->subscriptionService->subscriptionEngine->setup();
self::assertNull($result->errors);

// subscriptionError is reset
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::none());

// catchup after fix
$result = $this->subscriptionService->subscriptionEngine->boot();
self::assertNull($result->errors);

self::assertEquals(
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
}

/** @test */
public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents()
{
$this->subscriptionService->setupEventStore();
$this->fakeProjection->expects(self::once())->method('setUp');
$this->fakeProjection->expects(self::exactly(2))->method('setUp');
$this->fakeProjection->expects(self::exactly(2))->method('apply');
$this->subscriptionService->subscriptionEngine->setup();
$this->subscriptionService->subscriptionEngine->boot();
Expand Down Expand Up @@ -233,6 +248,31 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents()
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

//
// fix projection and catchup
//

$this->secondFakeProjection->killSaboteur();

$result = $this->subscriptionService->subscriptionEngine->setup();
self::assertNull($result->errors);

// subscriptionError is reset, but the position is preserved
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::BOOTING, SequenceNumber::fromInteger(1));
self::assertEquals(
[SequenceNumber::fromInteger(1)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

// catchup after fix
$result = $this->subscriptionService->subscriptionEngine->boot();
self::assertNull($result->errors);

self::assertEquals(
[SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)],
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
}

/** @test */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result
SubscriptionStatus::BOOTING,
SubscriptionStatus::ACTIVE,
SubscriptionStatus::DETACHED,
SubscriptionStatus::ERROR,
])));
if ($subscriptions->isEmpty()) {
$this->logger?->info('Subscription Engine: No subscriptions found.'); // todo not happy? Because there must be at least the content graph?!!
Expand Down Expand Up @@ -196,6 +197,15 @@ private function setupSubscription(Subscription $subscription): ?Error
$this->logger?->debug(sprintf('Subscription Engine: Active subscriber "%s" for "%s" has been re-setup.', $subscriber::class, $subscription->id->value));
return null;
}
if ($subscription->status === SubscriptionStatus::ERROR) {
$this->logger?->debug(sprintf('Subscription Engine: Failed subscriber "%s" for "%s" has been re-setup, set to %s. Previous error: %s.', $subscriber::class, $subscription->id->value, SubscriptionStatus::BOOTING->name, $subscription->error?->errorMessage));
$subscription->set(
status: SubscriptionStatus::BOOTING
);
$subscription->unsetError();
$this->subscriptionManager->update($subscription);
return null;
}
$this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" has been setup, set to %s from previous %s.', $subscriber::class, $subscription->id->value, SubscriptionStatus::BOOTING->name, $subscription->status->name));
$subscription->set(
status: SubscriptionStatus::BOOTING
Expand All @@ -216,7 +226,11 @@ private function resetSubscription(Subscription $subscription): ?Error
$this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" has an error in the resetState method: %s', $subscriber::class, $subscription->id->value, $e->getMessage()));
return Error::fromSubscriptionIdAndException($subscription->id, $e);
}
$subscription->reset();
$subscription->set(
status: SubscriptionStatus::BOOTING,
position: SequenceNumber::none()
);
$subscription->unsetError();
$this->subscriptionManager->update($subscription);
$this->logger?->debug(sprintf('Subscription Engine: For Subscriber "%s" for "%s" the resetState method has been executed.', $subscriber::class, $subscription->id->value));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ public function set(
$this->position = $position ?? $this->position;
}

public function reset(): void
public function unsetError(): void
{
$this->status = SubscriptionStatus::BOOTING;
$this->position = SequenceNumber::none();
$this->error = null;
}

Expand Down

0 comments on commit 6726d73

Please sign in to comment.