Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Related: #4746
  • Loading branch information
bwaidelich committed Oct 24, 2024
1 parent f7aaa60 commit 2172f09
Show file tree
Hide file tree
Showing 39 changed files with 1,907 additions and 228 deletions.
229 changes: 98 additions & 131 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,16 @@
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\EventStore\Events;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\CatchUp;
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphReadModelInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStatuses;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStates;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryStatus;
use Neos\ContentRepository\Core\SharedModel\Exception\WorkspaceDoesNotExist;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStream;
Expand All @@ -46,30 +38,21 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Psr\Clock\ClockInterface;

/**
* Main Entry Point to the system. Encapsulates the full event-sourced Content Repository.
*
* Use this to:
* - set up the necessary database tables and contents via {@see ContentRepository::setUp()}
* - send commands to the system (to mutate state) via {@see ContentRepository::handle()}
* - access projection state (to read state) via {@see ContentRepository::projectionState()}
* - catch up projections via {@see ContentRepository::catchUpProjection()}
* - send commands to the system (to mutate state) via {@see self::handle()}
* - access the content graph read model
* - access 3rd party read models via {@see self::projectionState()}
*
* @api
*/
final class ContentRepository
final readonly class ContentRepository
{
/**
* @var array<class-string<ProjectionStateInterface>, ProjectionStateInterface>
*/
private array $projectionStateCache;

private CommandHandlingDependencies $commandHandlingDependencies;

/**
Expand All @@ -78,16 +61,14 @@ final class ContentRepository
public function __construct(
public readonly ContentRepositoryId $id,
private readonly CommandBus $commandBus,
private readonly EventStoreInterface $eventStore,
private readonly ProjectionsAndCatchUpHooks $projectionsAndCatchUpHooks,
private readonly EventNormalizer $eventNormalizer,
private readonly EventPersister $eventPersister,
private readonly NodeTypeManager $nodeTypeManager,
private readonly InterDimensionalVariationGraph $variationGraph,
private readonly ContentDimensionSourceInterface $contentDimensionSource,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
private readonly ContentGraphReadModelInterface $contentGraphReadModel
private readonly ContentGraphReadModelInterface $contentGraphReadModel,
private readonly ProjectionStates $projectionStates,
) {
$this->commandHandlingDependencies = new CommandHandlingDependencies($this, $this->contentGraphReadModel);
}
Expand Down Expand Up @@ -143,114 +124,100 @@ public function handle(CommandInterface $command): void
*/
public function projectionState(string $projectionStateClassName): ProjectionStateInterface
{
if (!isset($this->projectionStateCache)) {
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
if ($projection instanceof ContentGraphProjectionInterface) {
continue;
}
$projectionState = $projection->getState();
$this->projectionStateCache[$projectionState::class] = $projectionState;
}
}
if (isset($this->projectionStateCache[$projectionStateClassName])) {
/** @var T $projectionState */
$projectionState = $this->projectionStateCache[$projectionStateClassName];
return $projectionState;
}
if (in_array(ContentGraphReadModelInterface::class, class_implements($projectionStateClassName), true)) {
throw new \InvalidArgumentException(sprintf('Accessing the internal content repository projection state via %s(%s) is not allowed. Please use the API on the content repository instead.', __FUNCTION__, $projectionStateClassName), 1729338679);
}

throw new \InvalidArgumentException(sprintf('A projection state of type "%s" is not registered in this content repository instance.', $projectionStateClassName), 1662033650);
}

/**
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
*/
public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void
{
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);

$catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
$catchUpHook = $catchUpHookFactory?->build($this);

// TODO allow custom stream name per projection
$streamName = VirtualStreamName::all();
$eventStream = $this->eventStore->load($streamName);
if ($options->maximumSequenceNumber !== null) {
$eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber);
}

$eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) {
$event = $this->eventNormalizer->denormalize($eventEnvelope->event);
if ($options->progressCallback !== null) {
($options->progressCallback)($event, $eventEnvelope);
}
if (!$projection->canHandle($event)) {
return;
}
$catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$projection->apply($event, $eventEnvelope);
if ($projection instanceof WithMarkStaleInterface) {
$projection->markStale();
}
$catchUpHook?->onAfterEvent($event, $eventEnvelope);
};

$catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage());

if ($catchUpHook !== null) {
$catchUpHook->onBeforeCatchUp();
$catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
try {
return $this->projectionStates->get($projectionStateClassName);
} catch (\InvalidArgumentException $e) {
throw new \InvalidArgumentException(sprintf('A projection state of type "%s" is not registered in this content repository instance: %s', $projectionStateClassName, $e->getMessage()), 1662033650, $e);
}
$catchUp->run($eventStream);
$catchUpHook?->onAfterCatchUp();
}

public function catchupProjections(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// FIXME optimise by only loading required events once and not per projection
// see https://github.com/neos/neos-development-collection/pull/4988/
$this->catchUpProjection($projection::class, CatchUpOptions::create());
}
}

public function setUp(): void
{
$this->eventStore->setup();
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
$projection->setUp();
}
}

public function status(): ContentRepositoryStatus
{
$projectionStatuses = ProjectionStatuses::createEmpty();
foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) {
$projectionStatuses = $projectionStatuses->with($projectionClassName, $projection->status());
}
return new ContentRepositoryStatus(
$this->eventStore->status(),
$projectionStatuses,
);
}

public function resetProjectionStates(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
$projection->reset();
}
}

/**
* @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
*/
public function resetProjectionState(string $projectionClassName): void
{
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
$projection->reset();
}
// /**
// * @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
// */
// public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void
// {
// $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
//
// $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
// $catchUpHook = $catchUpHookFactory?->build($this);
//
// // TODO allow custom stream name per projection
// $streamName = VirtualStreamName::all();
// $eventStream = $this->eventStore->load($streamName);
// if ($options->maximumSequenceNumber !== null) {
// $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber);
// }
//
// $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) {
// $event = $this->eventNormalizer->denormalize($eventEnvelope->event);
// if ($options->progressCallback !== null) {
// ($options->progressCallback)($event, $eventEnvelope);
// }
// if (!$projection->canHandle($event)) {
// return;
// }
// $catchUpHook?->onBeforeEvent($event, $eventEnvelope);
// $projection->apply($event, $eventEnvelope);
// if ($projection instanceof WithMarkStaleInterface) {
// $projection->markStale();
// }
// $catchUpHook?->onAfterEvent($event, $eventEnvelope);
// };
//
// $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage());
//
// if ($catchUpHook !== null) {
// $catchUpHook->onBeforeCatchUp();
// $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
// }
// $catchUp->run($eventStream);
// $catchUpHook?->onAfterCatchUp();
// }

// public function catchupProjections(): void
// {
// foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// // FIXME optimise by only loading required events once and not per projection
// // see https://github.com/neos/neos-development-collection/pull/4988/
// $this->catchUpProjection($projection::class, CatchUpOptions::create());
// }
// }

// public function setUp(): void
// {
// $this->eventStore->setup();
// foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// $projection->setUp();
// }
// }

// public function status(): ContentRepositoryStatus
// {
// $projectionStatuses = ProjectionStatuses::createEmpty();
// foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) {
// $projectionStatuses = $projectionStatuses->with($projectionClassName, $projection->status());
// }
// return new ContentRepositoryStatus(
// $this->eventStore->status(),
// $projectionStatuses,
// );
// }

// public function resetProjectionStates(): void
// {
// foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// $projection->reset();
// }
// }

// /**
// * @param class-string<ProjectionInterface<ProjectionStateInterface>> $projectionClassName
// */
// public function resetProjectionState(string $projectionClassName): void
// {
// $projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
// $projection->reset();
// }

/**
* @throws WorkspaceDoesNotExist if the workspace does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,5 @@ public function publishEvents(ContentRepository $contentRepository, EventsToPubl
$normalizedEvents,
$eventsToPublish->expectedVersion
);

$contentRepository->catchUpProjections();
}
}
Loading

0 comments on commit 2172f09

Please sign in to comment.