Skip to content

Commit

Permalink
Merge pull request #5315 from mhsdesign/task/schnappsidee-zwo-yield-e…
Browse files Browse the repository at this point in the history
…vents-to-publish-in-workspace-command-handler

TASK: Yield events to publish in workspace command handler
  • Loading branch information
kitsunet authored Oct 24, 2024
2 parents a21f159 + bf9c3b7 commit 6728d5e
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 464 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ Feature: ForkContentStream Without Dimensions
| propertyValues | {"text": {"value": "original value", "type": "string"}} |
| propertiesToUnset | {} |

Scenario: Try to fork a content stream that is closed:
Scenario: Try to create a workspace with the base workspace referring to a closed content stream
When the command CloseContentStream is executed with payload:
| Key | Value |
| contentStreamId | "cs-identifier" |
When the command ForkContentStream is executed with payload and exceptions are caught:
| Key | Value |
| contentStreamId | "user-cs-identifier" |
| sourceContentStreamId | "cs-identifier" |
When the command CreateWorkspace is executed with payload and exceptions are caught:
| Key | Value |
| workspaceName | "user-test" |
| baseWorkspaceName | "live" |
| newContentStreamId | "user-cs-identifier" |
Then the last command should have thrown an exception of type "ContentStreamIsClosed"
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ Feature: Individual node publication
Scenario: It is possible to publish a single node; and only this one is live.
# create nodes in user WS
Given I am in workspace "user-test"
And I am in workspace "user-test"
And I am in dimension space point {}
And the following CreateNodeAggregateWithNode commands are executed:
| nodeAggregateId | nodeTypeName | parentNodeAggregateId | nodeName | tetheredDescendantNodeAggregateIds |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Neos\ContentRepository\Core\CommandHandlingDependencies;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\EventStore\EventsToPublishFailed;

/**
* Implementation Detail of {@see ContentRepository::handle}, which does the command dispatching to the different
Expand All @@ -26,7 +27,10 @@ public function __construct(CommandHandlerInterface ...$handlers)
$this->handlers = $handlers;
}

public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish
/**
* @return EventsToPublish|\Generator<int, EventsToPublish>
*/
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator
{
// TODO fail if multiple handlers can handle the same command
foreach ($this->handlers as $handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Neos\ContentRepository\Core\CommandHandler;

use Neos\ContentRepository\Core\CommandHandlingDependencies;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;

/**
Expand All @@ -19,5 +18,9 @@
interface CommandHandlerInterface
{
public function canHandle(CommandInterface $command): bool;
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish;

/**
* @return EventsToPublish|\Generator<int, EventsToPublish>
*/
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator;
}
96 changes: 64 additions & 32 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
Expand Down Expand Up @@ -101,38 +102,36 @@ public function handle(CommandInterface $command): void
{
// the commands only calculate which events they want to have published, but do not do the
// publishing themselves
$eventsToPublish = $this->commandBus->handle($command, $this->commandHandlingDependencies);

// TODO meaningful exception message
$initiatingUserId = $this->userIdProvider->getUserId();
$initiatingTimestamp = $this->clock->now()->format(\DateTimeInterface::ATOM);

// Add "initiatingUserId" and "initiatingTimestamp" metadata to all events.
// This is done in order to keep information about the _original_ metadata when an
// event is re-applied during publishing/rebasing
// "initiatingUserId": The identifier of the user that originally triggered this event. This will never
// be overridden if it is set once.
// "initiatingTimestamp": The timestamp of the original event. The "recordedAt" timestamp will always be
// re-created and reflects the time an event was actually persisted in a stream,
// the "initiatingTimestamp" will be kept and is never overridden again.
// TODO: cleanup
$eventsToPublish = new EventsToPublish(
$eventsToPublish->streamName,
Events::fromArray(
$eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use (
$initiatingUserId,
$initiatingTimestamp
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
$metadata['initiatingUserId'] ??= $initiatingUserId;
$metadata['initiatingTimestamp'] ??= $initiatingTimestamp;
return DecoratedEvent::create($event, metadata: EventMetadata::fromArray($metadata));
})
),
$eventsToPublish->expectedVersion,
);

$this->eventPersister->publishEvents($this, $eventsToPublish);
$eventsToPublishOrGenerator = $this->commandBus->handle($command, $this->commandHandlingDependencies);

if ($eventsToPublishOrGenerator instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublishOrGenerator);
$this->eventPersister->publishEvents($this, $eventsToPublish);
} else {
foreach ($eventsToPublishOrGenerator as $eventsToPublish) {
assert($eventsToPublish instanceof EventsToPublish); // just for the ide
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublish);
try {
$this->eventPersister->publishEvents($this, $eventsToPublish);
} catch (ConcurrencyException $e) {
// we pass the exception into the generator, so it could be try-caught and reacted upon:
//
// try {
// yield EventsToPublish();
// } catch (ConcurrencyException $e) {
// yield $restoreState();
// throw $e;
// }

$errorStrategy = $eventsToPublishOrGenerator->throw($e);

if ($errorStrategy instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($errorStrategy);
$this->eventPersister->publishEvents($this, $this->enrichEventsToPublishWithMetadata($eventsToPublish));
}
}
}
}
}


Expand Down Expand Up @@ -301,4 +300,37 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
{
return $this->contentDimensionSource;
}

/**
* Add "initiatingUserId" and "initiatingTimestamp" metadata to all events.
* This is done in order to keep information about the _original_ metadata when an
* event is re-applied during publishing/rebasing
* "initiatingUserId": The identifier of the user that originally triggered this event. This will never
* be overridden if it is set once.
* "initiatingTimestamp": The timestamp of the original event. The "recordedAt" timestamp will always be
* re-created and reflects the time an event was actually persisted in a stream,
* the "initiatingTimestamp" will be kept and is never overridden again.
*/
private function enrichEventsToPublishWithMetadata(EventsToPublish $eventsToPublish): EventsToPublish
{
$initiatingUserId = $this->userIdProvider->getUserId();
$initiatingTimestamp = $this->clock->now()->format(\DateTimeInterface::ATOM);

return new EventsToPublish(
$eventsToPublish->streamName,
Events::fromArray(
$eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use (
$initiatingUserId,
$initiatingTimestamp
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
$metadata['initiatingUserId'] ??= $initiatingUserId;
$metadata['initiatingTimestamp'] ??= $initiatingTimestamp;

return DecoratedEvent::create($event, metadata: EventMetadata::fromArray($metadata));
})
),
$eventsToPublish->expectedVersion,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public function __construct(
}

/**
* @param EventsToPublish $eventsToPublish
* @throws ConcurrencyException in case the expectedVersion does not match
*/
public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;

/**
* @internal implementation detail. You must not use this command directly.
* Direct use may lead to hard to revert senseless state in your content repository.
* Please use the higher level workspace commands instead.
* @internal only exposed for testing purposes. You must not use this command directly.
* Direct use will leave your content stream in a closed state.
*/
final readonly class CloseContentStream implements CommandInterface
{
Expand Down
Loading

0 comments on commit 6728d5e

Please sign in to comment.