Skip to content

Commit

Permalink
Merge pull request #466 from patchlevel/use-index-for-position
Browse files Browse the repository at this point in the history
use index for position in projectionist
  • Loading branch information
DavidBadura authored Jan 15, 2024
2 parents e1da663 + 4fb9b2d commit b0e4be2
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 33 deletions.
5 changes: 5 additions & 0 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<code><![CDATA[$data['payload']]]></code>
</MixedArgument>
</file>
<file src="src/Store/ArrayStream.php">
<InvalidPropertyAssignmentValue>
<code>$index</code>
</InvalidPropertyAssignmentValue>
</file>
<file src="src/Store/DoctrineHelper.php">
<MixedReturnTypeCoercion>
<code>$normalizedCustomHeaders</code>
Expand Down
5 changes: 5 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ parameters:
count: 1
path: src/Snapshot/DefaultSnapshotStore.php

-
message: "#^Property Patchlevel\\\\EventSourcing\\\\Store\\\\ArrayStream\\:\\:\\$index \\(int\\<1, max\\>\\|null\\) does not accept int\\<0, max\\>\\.$#"
count: 1
path: src/Store/ArrayStream.php

-
message: "#^Ternary operator condition is always true\\.$#"
count: 1
Expand Down
4 changes: 2 additions & 2 deletions src/Projection/Projection/Projection.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public function projectionError(): ProjectionError|null
return $this->error;
}

public function incrementPosition(): void
public function changePosition(int $position): void
{
$this->position++;
$this->position = $position;
}

public function isNew(): bool
Expand Down
59 changes: 41 additions & 18 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,35 +116,53 @@ public function boot(
return;
}

$currentPosition = $projections->getLowestProjectionPosition();
$startIndex = $projections->getLowestProjectionPosition();

$this->logger?->debug(
sprintf(
'Projectionist: Event stream is processed for booting from position %s.',
$currentPosition,
$startIndex,
),
);

$stream = null;

try {
$criteria = new Criteria(fromIndex: $currentPosition);
$criteria = new Criteria(fromIndex: $startIndex);
$stream = $this->streamableMessageStore->load($criteria);

$messageCounter = 0;

foreach ($stream as $message) {
$index = $stream->index();

if ($index === null) {
throw new UnexpectedError('Stream index is null, this should not happen.');
}

foreach ($projections->filterByProjectionStatus(ProjectionStatus::Booting) as $projection) {
$this->handleMessage($message, $projection, $throwByError);
if ($projection->position() >= $index) {
$this->logger?->debug(
sprintf(
'Projectionist: Projection "%s" is farther than the current position (%d > %d), continue booting.',
$projection->id()->toString(),
$projection->position(),
$index,
),
);

continue;
}

$this->handleMessage($index, $message, $projection, $throwByError);
}

$currentPosition++;
$messageCounter++;

$this->logger?->debug(
sprintf(
'Projectionist: Current event stream position for booting: %s',
$currentPosition,
$index,
),
);

Expand Down Expand Up @@ -198,45 +216,50 @@ public function run(
return;
}

$currentPosition = $projections->getLowestProjectionPosition();
$startIndex = $projections->getLowestProjectionPosition();

$this->logger?->debug(
sprintf(
'Projectionist: Event stream is processed from position %d.',
$currentPosition,
$startIndex,
),
);

$stream = null;

try {
$criteria = new Criteria(fromIndex: $currentPosition);
$criteria = new Criteria(fromIndex: $startIndex);
$stream = $this->streamableMessageStore->load($criteria);

$messageCounter = 0;

foreach ($stream as $message) {
$index = $stream->index();

if ($index === null) {
throw new UnexpectedError('Stream index is null, this should not happen.');
}

foreach ($projections->filterByProjectionStatus(ProjectionStatus::Active) as $projection) {
if ($projection->position() > $currentPosition) {
if ($projection->position() >= $index) {
$this->logger?->debug(
sprintf(
'Projectionist: Projection "%s" is farther than the current position (%d > %d), continue processing.',
$projection->id()->toString(),
$projection->position(),
$currentPosition,
$index,
),
);

continue;
}

$this->handleMessage($message, $projection, $throwByError);
$this->handleMessage($index, $message, $projection, $throwByError);
}

$currentPosition++;
$messageCounter++;

$this->logger?->debug(sprintf('Projectionist: Current event stream position: %s', $currentPosition));
$this->logger?->debug(sprintf('Projectionist: Current event stream position: %s', $index));

if ($limit !== null && $messageCounter >= $limit) {
$this->logger?->info(
Expand All @@ -256,7 +279,7 @@ public function run(
$this->logger?->info(
sprintf(
'Projectionist: End of stream on position "%d" has been reached, finish processing.',
$currentPosition,
$stream->index() ?: 'unknown',
),
);
}
Expand Down Expand Up @@ -430,7 +453,7 @@ public function projections(): ProjectionCollection
return $projections;
}

private function handleMessage(Message $message, Projection $projection, bool $throwByError): void
private function handleMessage(int $index, Message $message, Projection $projection, bool $throwByError): void
{
$projector = $this->projector($projection->id());

Expand All @@ -441,7 +464,7 @@ private function handleMessage(Message $message, Projection $projection, bool $t
$subscribeMethod = $this->projectorResolver->resolveSubscribeMethod($projector, $message);

if (!$subscribeMethod) {
$projection->incrementPosition();
$projection->changePosition($index);
$this->projectionStore->save($projection);

$this->logger?->debug(
Expand Down Expand Up @@ -484,7 +507,7 @@ private function handleMessage(Message $message, Projection $projection, bool $t
return;
}

$projection->incrementPosition();
$projection->changePosition($index);
$projection->resetRetry();
$this->projectionStore->save($projection);

Expand Down
11 changes: 11 additions & 0 deletions src/Projection/Projectionist/UnexpectedError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projectionist;

use RuntimeException;

final class UnexpectedError extends RuntimeException
{
}
30 changes: 22 additions & 8 deletions src/Store/ArrayStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ final class ArrayStream implements Stream, IteratorAggregate
/** @var positive-int|0|null */
private int|null $position;

/** @param list<Message> $messages The index is based on position. An offset is not supported. */
/** @var positive-int|null */
private int|null $index;

/** @param array<positive-int|0, Message> $messages The index is the key. An offset is not supported. */
public function __construct(array $messages = [])
{
$this->iterator = $messages === [] ? new ArrayIterator() : $this->createGenerator($messages);
$this->position = null;
$this->index = null;
}

public function close(): void
Expand Down Expand Up @@ -54,13 +58,11 @@ public function position(): int|null
*/
public function index(): int|null
{
$position = $this->position();

if ($position === null) {
return null;
if (!$this->index) {
$this->iterator->key();
}

return $position + 1;
return $this->index;
}

public function current(): Message|null
Expand All @@ -69,19 +71,31 @@ public function current(): Message|null
}

/**
* @param list<Message> $messages
* @param array<positive-int|0, Message> $messages
*
* @return Generator<Message>
*/
private function createGenerator(array $messages): Generator
{
foreach ($messages as $message) {
$hasIndex = true;

foreach ($messages as $index => $message) {
if ($this->position === null) {
$this->position = 0;
} else {
$this->position++;
}

if ($index === 0) {
$hasIndex = false;
}

if ($hasIndex) {
$this->index = $index;
} else {
$this->index = $this->position + 1;
}

yield $message;
}
}
Expand Down
6 changes: 3 additions & 3 deletions tests/Unit/Projection/Projection/ProjectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ public function testOutdated(): void
self::assertTrue($projection->isOutdated());
}

public function testIncrementPosition(): void
public function testChangePosition(): void
{
$projection = new Projection(
new ProjectionId('test', 1),
);

$projection->incrementPosition();
$projection->changePosition(10);

self::assertEquals(1, $projection->position());
self::assertEquals(10, $projection->position());
}
}
Loading

0 comments on commit b0e4be2

Please sign in to comment.