Skip to content

Commit

Permalink
Fix ProjectionReplayService and introduce CatchUpOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed Aug 11, 2023
1 parent 38385ac commit 19af2f6
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 26 deletions.
3 changes: 2 additions & 1 deletion Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
Expand Down Expand Up @@ -155,7 +156,7 @@ public function projectionState(string $projectionStateClassName): ProjectionSta
/**
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
*/
public function catchUpProjection(string $projectionClassName): void
public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void
{
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);

Expand Down
55 changes: 55 additions & 0 deletions Neos.ContentRepository.Core/Classes/Projection/CatchUpOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Projection;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\EventStore\Model\Event\SequenceNumber;

/**
* Options for {@see ContentRepository::catchUpProjection()}
*
* @api *NOTE:** The signature of the {@see create()} and {@see with()} methods might be extended in the future, so they should only ever be used with named arguments
*/
final class CatchUpOptions
{
/**
* @param SequenceNumber|null $maximumSequenceNumber If specified the catch-up will stop at the specified {@see SequenceNumber}
*/
private function __construct(
public readonly ?SequenceNumber $maximumSequenceNumber,
) {
}

/**
* Creates an instance for the specified options
*
* Note: The signature of this method might be extended in the future, so it should always be used with named arguments
* @see https://www.php.net/manual/en/functions.arguments.php#functions.named-arguments
*/
public static function create(
SequenceNumber|int|null $maximumSequenceNumber = null,
): self
{
if (is_int($maximumSequenceNumber)) {
$maximumSequenceNumber = SequenceNumber::fromInteger($maximumSequenceNumber);
}
return new self($maximumSequenceNumber);
}


/**
* Returns a new instance with the specified additional options
*
* Note: The signature of this method might be extended in the future, so it should always be used with named arguments
* @see https://www.php.net/manual/en/functions.arguments.php#functions.named-arguments
*/
public function with(
SequenceNumber|int|null $maximumSequenceNumber = null,
): self {
return self::create(
$maximumSequenceNumber ?? $this->maximumSequenceNumber,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
use Neos\ContentRepository\Core\Feature\NodeModification\Dto\PropertyValuesToWrite;
use Neos\ContentRepository\Core\Infrastructure\DbalClientInterface;
use Neos\ContentRepository\Core\NodeType\NodeTypeName;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Filter\FindSubtreeFilter;
use Neos\ContentRepository\Core\Projection\ContentGraph\Node;
Expand Down Expand Up @@ -554,7 +555,7 @@ public function iPruneRemovedContentStreamsFromTheEventStream()
public function iReplayTheProjection(string $projectionName)
{
$this->contentRepository->resetProjectionState($projectionName);
$this->contentRepository->catchUpProjection($projectionName);
$this->contentRepository->catchUpProjection($projectionName, CatchUpOptions::create());
}

abstract protected function getContentRepositoryService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Exception as DbalException;
use Doctrine\DBAL\Exception\ConnectionException;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\LegacyNodeMigration\LegacyMigrationService;
use Neos\ContentRepository\LegacyNodeMigration\LegacyMigrationServiceFactory;
Expand Down Expand Up @@ -130,7 +131,7 @@ public function migrateLegacyDataCommand(bool $verbose = false, string $config =
// we also need to reset the projections; in order to ensure the system runs deterministically. We
// do this by replaying the just-truncated event stream.
$projectionService = $this->contentRepositoryRegistry->buildService($contentRepositoryId, $this->projectionReplayServiceFactory);
$projectionService->replayAllProjections();
$projectionService->replayAllProjections(CatchUpOptions::create());
$this->outputLine('Truncated events');

$liveContentStreamId = ContentStreamId::create();
Expand All @@ -156,7 +157,7 @@ public function migrateLegacyDataCommand(bool $verbose = false, string $config =
$this->outputLine();

$this->outputLine('Replaying projections');
$projectionService->replayAllProjections();
$projectionService->replayAllProjections(CatchUpOptions::create());
$this->outputLine('<success>Done</success>');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Neos\ContentRepositoryRegistry\Command;

use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\ContentRepositoryRegistry\Service\ProjectionReplayServiceFactory;
use Neos\EventStore\Model\Event\SequenceNumber;
Expand Down Expand Up @@ -54,7 +55,11 @@ public function replayCommand(string $projection, string $contentRepository = 'd
$this->outputLine('Replaying events for projection "%s" of Content Repository "%s" ...', [$projection, $contentRepositoryId->value]);
// TODO start progress bar
}
$projectionService->replayProjection($projection, $until !== 0 ? SequenceNumber::fromInteger($until) : null);
$options = CatchUpOptions::create();
if ($until !== null) {
$options = $options->with(maximumSequenceNumber: SequenceNumber::fromInteger($until));
}
$projectionService->replayProjection($projection, $options);
if (!$quiet) {
// TODO finish progress bar
$this->outputLine('<success>Done.</success>');
Expand All @@ -75,7 +80,7 @@ public function replayAllCommand(string $contentRepository = 'default', bool $qu
$this->outputLine('Replaying events for all projections of Content Repository "%s" ...', [$contentRepositoryId->value]);
// TODO start progress bar
}
$projectionService->replayAllProjections();
$projectionService->replayAllProjections(CatchUpOptions::create());
if (!$quiet) {
// TODO finish progress bar
$this->outputLine('<success>Done.</success>');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry\Command;

use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
Expand All @@ -25,6 +26,6 @@ public function catchupCommand(string $contentRepositoryIdentifier, string $proj
{

$contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier));
$contentRepository->catchUpProjection($projectionClassName);
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger;

use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\Flow\Annotations as Flow;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
Expand Down Expand Up @@ -66,7 +67,7 @@ public function triggerCatchUp(Projections $projections): void
$contentRepository = $this->contentRepositoryRegistry->get($this->contentRepositoryId);
foreach ($projections as $projection) {
$projectionClassName = get_class($projection);
$contentRepository->catchUpProjection($projectionClassName);
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
}
} else {
$this->inner->triggerCatchUp($projections);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventStream\VirtualStreamName;

/**
* Content Repository service to perform Projection replays
Expand All @@ -21,31 +19,21 @@ final class ProjectionReplayService implements ContentRepositoryServiceInterface
public function __construct(
private readonly Projections $projections,
private readonly ContentRepository $contentRepository,
private readonly EventStoreInterface $eventStore,
) {
}

public function replayProjection(string $projectionAliasOrClassName, ?SequenceNumber $maximumSequenceNumber = null): void
public function replayProjection(string $projectionAliasOrClassName, CatchUpOptions $options): void
{
$projectionClassName = $this->resolveProjectionClassName($projectionAliasOrClassName);
$this->contentRepository->resetProjectionState($projectionClassName);

// the logic below is from ContentRepository::catchUpProjection,
// but adjusted with the maximumSequenceNumber restriction
$projection = $this->projections->get($projectionClassName);
$streamName = VirtualStreamName::all();
$eventStream = $this->eventStore->load($streamName);
if ($maximumSequenceNumber !== null) {
$eventStream = $eventStream->withMaximumSequenceNumber($maximumSequenceNumber);
}
$projection->catchUp($eventStream, $this->contentRepository);
$this->contentRepository->catchUpProjection($projectionClassName, $options);
}

public function replayAllProjections(): void
public function replayAllProjections(CatchUpOptions $options): void
{
foreach ($this->projectionClassNamesAndAliases() as $classNamesAndAlias) {
$this->contentRepository->resetProjectionState($classNamesAndAlias['className']);
$this->contentRepository->catchUpProjection($classNamesAndAlias['className']);
$this->contentRepository->catchUpProjection($classNamesAndAlias['className'], $options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
return new ProjectionReplayService(
$serviceFactoryDependencies->projections,
$serviceFactoryDependencies->contentRepository,
$serviceFactoryDependencies->eventStore
);
}
}

0 comments on commit 19af2f6

Please sign in to comment.