Skip to content

Commit

Permalink
FEATURE: Introduce publishableEvents count on content stream
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Oct 31, 2024
1 parent 41c1781 commit 7597fe0
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private function getBasicWorkspaceQuery(): QueryBuilder
$queryBuilder = $this->dbal->createQueryBuilder();

return $queryBuilder
->select('ws.name, ws.baseWorkspaceName, ws.currentContentStreamId, cs.sourceContentStreamVersion = scs.version as upToDateWithBase')
->select('ws.name, ws.baseWorkspaceName, ws.currentContentStreamId, cs.sourceContentStreamVersion = scs.version as upToDateWithBase, cs.publishableEvents as publishableEventsOnStream')
->from($this->tableNames->workspace(), 'ws')
->join('ws', $this->tableNames->contentStream(), 'cs', 'cs.id = ws.currentcontentstreamid')
->leftJoin('cs', $this->tableNames->contentStream(), 'scs', 'scs.id = cs.sourceContentStreamId');
Expand Down Expand Up @@ -188,11 +188,12 @@ private static function workspaceFromDatabaseRow(array $row): Workspace
$baseWorkspaceName,
ContentStreamId::fromString($row['currentContentStreamId']),
$status,
$baseWorkspaceName === null ? 0 : $row['publishableEventsOnStream'],
);
}

/**
* @param array<string, mixed> $row todo fetch source content stream version and use for publishing as expected version
* @param array<string, mixed> $row
*/
private static function contentStreamFromDatabaseRow(array $row): ContentStream
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void
default => $event instanceof EmbedsContentStreamId || throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))),
};
if ($event instanceof EmbedsContentStreamId && ContentStreamEventStreamName::isContentStreamStreamName($eventEnvelope->streamName)) {
$this->updateContentStreamVersion($event->getContentStreamId(), $eventEnvelope->version);
$this->updateContentStreamVersion($event, $eventEnvelope->version);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private function createContentStreamTable(): Table
DbalSchemaFactory::columnForContentStreamId('sourceContentStreamId')->setNotnull(false),
(new Column('sourceContentStreamVersion', Type::getType(Types::INTEGER)))->setNotnull(false),
(new Column('closed', Type::getType(Types::BOOLEAN)))->setDefault(false)->setNotnull(true),
(new Column('publishableEvents', Type::getType(Types::INTEGER)))->setNotnull(true),
]);

return $contentStreamTable->setPrimaryKey(['id']);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature;

use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Feature\Common\EmbedsContentStreamId;
use Neos\ContentRepository\Core\Feature\Common\PublishableToWorkspaceInterface;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\EventStore\Model\Event\Version;

Expand All @@ -20,7 +23,8 @@ private function createContentStream(ContentStreamId $contentStreamId, ?ContentS
'id' => $contentStreamId->value,
'version' => 0,
'sourceContentStreamId' => $sourceContentStreamId?->value,
'sourceContentStreamVersion' => $sourceVersion?->value
'sourceContentStreamVersion' => $sourceVersion?->value,
'publishableEvents' => 0
]);
}

Expand Down Expand Up @@ -49,12 +53,24 @@ private function removeContentStream(ContentStreamId $contentStreamId): void
]);
}

private function updateContentStreamVersion(ContentStreamId $contentStreamId, Version $version): void
private function updateContentStreamVersion(EventInterface&EmbedsContentStreamId $event, Version $version): void
{
$this->dbal->update($this->tableNames->contentStream(), [
'version' => $version->value,
], [
'id' => $contentStreamId->value,
]);
// todo make fork content stream `EmbedsContentStreamId` but then just ignore it here because we set the version already
$isPublishableEvent = $event instanceof PublishableToWorkspaceInterface;
if ($isPublishableEvent) {
$this->dbal->executeStatement(
"UPDATE {$this->tableNames->contentStream()} SET version=:version, publishableEvents=publishableEvents+1 WHERE id=:id",
[
'version' => $version->value,
'id' => $event->getContentStreamId()->value,
]
);
} else {
$this->dbal->update($this->tableNames->contentStream(), [
'version' => $version->value,
], [
'id' => $event->getContentStreamId()->value,
]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,7 @@ Feature: Workspace based content publishing

# the user and live workspace are unchanged
Then I expect exactly 1 event to be published on stream "Workspace:user-test"
Then I expect exactly 3 event to be published on stream "ContentStream:user-cs-identifier"
And event at index 2 is of type "ContentStreamWasReopened" with payload:
| Key | Expected |
| contentStreamId | "user-cs-identifier" |
Then I expect exactly 1 event to be published on stream "ContentStream:user-cs-identifier"

Then I expect node aggregate identifier "nody-mc-nodeface" to lead to node user-cs-identifier;nody-mc-nodeface;{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

namespace Neos\ContentRepository\Core\Feature\Common;

use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;

Expand All @@ -26,7 +27,7 @@
*
* @internal used internally for the publishing mechanism of workspaces
*/
interface PublishableToWorkspaceInterface
interface PublishableToWorkspaceInterface extends EventInterface
{
public function withWorkspaceNameAndContentStreamId(WorkspaceName $targetWorkspaceName, ContentStreamId $contentStreamId): self;
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ private function handlePublishWorkspace(
): \Generator {
$workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies);
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);
if (!$workspace->hasPublishableChanges()) {
// no-op
return;
}

if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) {
throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1729711258);
}
Expand All @@ -197,15 +202,6 @@ private function handlePublishWorkspace(
)
);

if ($rebaseableCommands->isEmpty()) {
// we have no changes, we just reopen; partial no-op
yield $this->reopenContentStream(
$workspace->currentContentStreamId,
$commandHandlingDependencies
);
return;
}

try {
yield from $this->publishWorkspace(
$workspace,
Expand Down Expand Up @@ -319,7 +315,6 @@ private function getCopiedEventsOfEventStream(
$event = $this->eventNormalizer->denormalize($eventEnvelope->event);

if ($event instanceof PublishableToWorkspaceInterface) {
/** @var EventInterface $copiedEvent */
$copiedEvent = $event->withWorkspaceNameAndContentStreamId($targetWorkspaceName, $targetContentStreamId);
// We need to add the event metadata here for rebasing in nested workspace situations
// (and for exporting)
Expand Down Expand Up @@ -358,14 +353,7 @@ private function handleRebaseWorkspace(
$commandHandlingDependencies
);

$rebaseableCommands = RebaseableCommands::extractFromEventStream(
$this->eventStore->load(
ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)
->getEventStreamName()
)
);

if ($rebaseableCommands->isEmpty()) {
if (!$workspace->hasPublishableChanges()) {
// if we have no changes in the workspace we can fork from the base directly
yield from $this->rebaseWorkspaceWithoutChanges(
$workspace,
Expand All @@ -376,6 +364,13 @@ private function handleRebaseWorkspace(
return;
}

$rebaseableCommands = RebaseableCommands::extractFromEventStream(
$this->eventStore->load(
ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId)
->getEventStreamName()
)
);

$commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName);

$commandSimulator->run(
Expand Down Expand Up @@ -438,16 +433,17 @@ private function handlePublishIndividualNodesFromWorkspace(
PublishIndividualNodesFromWorkspace $command,
CommandHandlingDependencies $commandHandlingDependencies,
): \Generator {
if ($command->nodesToPublish->isEmpty()) {
$workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies);
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);
if ($command->nodesToPublish->isEmpty() || !$workspace->hasPublishableChanges()) {
// noop
return;
}

$workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies);
// todo check that fetching workspace throws if there is no content stream id for it
if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) {
throw new \RuntimeException('Cannot publish nodes on a workspace with a stateless content stream', 1710410114);
}
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);
$this->requireContentStreamToNotBeClosed($baseWorkspace->currentContentStreamId, $commandHandlingDependencies);
$baseContentStreamVersion = $commandHandlingDependencies->getContentStreamVersion($baseWorkspace->currentContentStreamId);

Expand Down Expand Up @@ -572,16 +568,17 @@ private function handleDiscardIndividualNodesFromWorkspace(
DiscardIndividualNodesFromWorkspace $command,
CommandHandlingDependencies $commandHandlingDependencies,
): \Generator {
if ($command->nodesToDiscard->isEmpty()) {
$workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies);
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);

if ($command->nodesToDiscard->isEmpty() || !$workspace->hasPublishableChanges()) {
// noop
return;
}

$workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies);
if (!$commandHandlingDependencies->contentStreamExists($workspace->currentContentStreamId)) {
throw new \RuntimeException('Cannot discard nodes on a workspace with a stateless content stream', 1710408112);
}
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);

yield $this->closeContentStream(
$workspace->currentContentStreamId,
Expand Down Expand Up @@ -673,7 +670,7 @@ private function handleDiscardWorkspace(
$workspace = $this->requireWorkspace($command->workspaceName, $commandHandlingDependencies);
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);

if (!$this->hasEventsInContentStreamExceptForking(ContentStreamEventStreamName::fromContentStreamId($workspace->currentContentStreamId))) {
if (!$workspace->hasPublishableChanges()) {
return;
}

Expand Down Expand Up @@ -885,33 +882,8 @@ private function requireNonCircularRelationBetweenWorkspaces(Workspace $workspac
*/
private function requireEmptyWorkspace(Workspace $workspace): void
{
$workspaceContentStreamName = ContentStreamEventStreamName::fromContentStreamId(
$workspace->currentContentStreamId
);
if ($this->hasEventsInContentStreamExceptForking($workspaceContentStreamName)) {
if ($workspace->hasPublishableChanges()) {
throw new WorkspaceIsNotEmptyException('The user workspace needs to be empty before switching the base workspace.', 1681455989);
}
}

/**
* @return bool
*/
private function hasEventsInContentStreamExceptForking(
ContentStreamEventStreamName $workspaceContentStreamName,
): bool {
// todo introduce workspace has changes instead
$workspaceContentStream = $this->eventStore->load($workspaceContentStreamName->getEventStreamName());

$fullQualifiedEventClassName = ContentStreamWasForked::class;
$shortEventClassName = substr($fullQualifiedEventClassName, strrpos($fullQualifiedEventClassName, '\\') + 1);

foreach ($workspaceContentStream as $eventEnvelope) {
if ($eventEnvelope->event->type->value === EventType::fromString($shortEventClassName)->value) {
continue;
}
return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ private function __construct(
public ?WorkspaceName $baseWorkspaceName,
public ContentStreamId $currentContentStreamId,
public WorkspaceStatus $status,
private int $countOfPublishableChanges
) {
if ($this->countOfPublishableChanges < 0) {
throw new \InvalidArgumentException('The number of changes must be greater than 0', 1730371545);
}
if ($this->isRootWorkspace() && $this->countOfPublishableChanges !== 0) {
throw new \InvalidArgumentException('Root workspaces cannot have changes', 1730371566);
}
}

/**
Expand All @@ -43,8 +50,19 @@ public static function create(
?WorkspaceName $baseWorkspaceName,
ContentStreamId $currentContentStreamId,
WorkspaceStatus $status,
int $countOfPublishableChanges
): self {
return new self($workspaceName, $baseWorkspaceName, $currentContentStreamId, $status);
return new self($workspaceName, $baseWorkspaceName, $currentContentStreamId, $status, $countOfPublishableChanges);
}

public function hasPublishableChanges(): bool
{
return $this->countOfPublishableChanges !== 0;
}

public function countPublishableChanges(): int
{
return $this->countOfPublishableChanges;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use Neos\ContentRepository\NodeMigration\Filter\FiltersFactory;
use Neos\ContentRepository\NodeMigration\Filter\InvalidMigrationFilterSpecified;
use Neos\ContentRepository\NodeMigration\Transformation\TransformationsFactory;
use Neos\Neos\PendingChangesProjection\ChangeFinder;

/**
* Node Migrations are manually written adjustments to the Node tree;
Expand Down Expand Up @@ -68,7 +67,7 @@ public function executeMigration(ExecuteMigration $command): void

$targetWorkspaceWasCreated = false;
if ($targetWorkspace = $this->contentRepository->findWorkspaceByName($command->targetWorkspaceName)) {
if (!$this->workspaceIsEmpty($targetWorkspace)) {
if ($targetWorkspace->hasPublishableChanges()) {
throw new MigrationException(sprintf('Target workspace "%s" already exists an is not empty. Please clear the workspace before.', $targetWorkspace->workspaceName->value));
}

Expand Down Expand Up @@ -196,12 +195,4 @@ protected function executeSubMigration(
}
}
}

private function workspaceIsEmpty(Workspace $workspace): bool
{
// todo introduce Workspace::hasPendingChanges
return $this->contentRepository
->projectionState(ChangeFinder::class)
->countByContentStreamId($workspace->currentContentStreamId) === 0;
}
}
17 changes: 6 additions & 11 deletions Neos.Neos/Classes/Command/WorkspaceCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -396,20 +396,15 @@ public function deleteCommand(string $workspace, bool $force = false, string $co
$this->quit(3);
}


try {
$nodesCount = $this->workspacePublishingService->countPendingWorkspaceChanges($contentRepositoryId, $workspaceName);
} catch (\Exception $exception) {
$this->outputLine('Could not fetch unpublished nodes for workspace %s, nothing was deleted. %s', [$workspaceName->value, $exception->getMessage()]);
$this->quit(4);
}

if ($nodesCount > 0) {
if ($crWorkspace->hasPublishableChanges()) {
if ($force === false) {
$this->outputLine(
'Did not delete workspace "%s" because it contains %s unpublished node(s).'
'Did not delete workspace "%s" because it contains %d publishable changes.'
. ' Use --force to delete it nevertheless.',
[$workspaceName->value, $nodesCount]
[
$crWorkspace->countPublishableChanges(),
$workspaceName->value
]
);
$this->quit(5);
}
Expand Down
Loading

0 comments on commit 7597fe0

Please sign in to comment.