diff --git a/Makefile b/Makefile
index a836f0029..79259706e 100644
--- a/Makefile
+++ b/Makefile
@@ -18,11 +18,11 @@ cs: vendor
.PHONY: phpstan
phpstan: vendor ## run phpstan static code analyser
- vendor/bin/phpstan analyse
+ php -d memory_limit=312M vendor/bin/phpstan analyse
.PHONY: phpstan-baseline
phpstan-baseline: vendor ## run phpstan static code analyser
- vendor/bin/phpstan analyse --generate-baseline
+ php -d memory_limit=312M vendor/bin/phpstan analyse --generate-baseline
.PHONY: psalm
psalm: vendor ## run psalm static code analyser
diff --git a/baseline.xml b/baseline.xml
index c5358ae73..cdf4ed2fe 100644
--- a/baseline.xml
+++ b/baseline.xml
@@ -369,6 +369,13 @@
+
+
+
+
+
+
+
diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php
index 8e352dee3..4613bac9d 100644
--- a/src/Subscription/Engine/DefaultSubscriptionEngine.php
+++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php
@@ -62,7 +62,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk
groups: $criteria->groups,
status: [Status::New],
),
- function (array $subscriptions) use ($skipBooting): Result {
+ function (SubscriptionCollection $subscriptions) use ($skipBooting): Result {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.');
@@ -171,14 +171,14 @@ public function boot(
groups: $criteria->groups,
status: [Status::Booting],
),
- function ($subscriptions) use ($limit): ProcessedResult {
+ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.');
return new ProcessedResult(0, true);
}
- $startIndex = $this->lowestSubscriptionPosition($subscriptions);
+ $startIndex = $subscriptions->lowestPosition();
$this->logger?->debug(
sprintf(
@@ -198,6 +198,8 @@ function ($subscriptions) use ($limit): ProcessedResult {
);
foreach ($stream as $message) {
+ $messageCounter++;
+
$index = $stream->index();
if ($index === null) {
@@ -205,10 +207,6 @@ function ($subscriptions) use ($limit): ProcessedResult {
}
foreach ($subscriptions as $subscription) {
- if (!$subscription->isBooting()) {
- continue;
- }
-
if ($subscription->position() >= $index) {
$this->logger?->debug(
sprintf(
@@ -229,9 +227,17 @@ function ($subscriptions) use ($limit): ProcessedResult {
}
$errors[] = $error;
- }
- $messageCounter++;
+ $subscriptions->remove($subscription);
+
+ if (count($subscriptions) === 0) {
+ $this->logger?->info(
+ 'Subscription Engine: No subscriptions in booting status, finish booting.',
+ );
+
+ break 2;
+ }
+ }
$this->logger?->debug(
sprintf(
@@ -265,6 +271,8 @@ function ($subscriptions) use ($limit): ProcessedResult {
if ($error) {
$errors[] = $error;
+
+ $subscriptions->remove($subscription);
}
$this->subscriptionManager->update($subscription);
@@ -275,10 +283,6 @@ function ($subscriptions) use ($limit): ProcessedResult {
$this->logger?->debug('Subscription Engine: End of stream for booting has been reached.');
foreach ($subscriptions as $subscription) {
- if (!$subscription->isBooting()) {
- continue;
- }
-
if ($subscription->runMode() === RunMode::Once) {
$subscription->finished();
$this->subscriptionManager->update($subscription);
@@ -340,14 +344,14 @@ public function run(
groups: $criteria->groups,
status: [Status::Active],
),
- function (array $subscriptions) use ($limit): ProcessedResult {
+ function (SubscriptionCollection $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.');
return new ProcessedResult(0, true);
}
- $startIndex = $this->lowestSubscriptionPosition($subscriptions);
+ $startIndex = $subscriptions->lowestPosition();
$this->logger?->debug(
sprintf(
@@ -366,6 +370,8 @@ function (array $subscriptions) use ($limit): ProcessedResult {
$stream = $this->messageStore->load($criteria);
foreach ($stream as $message) {
+ $messageCounter++;
+
$index = $stream->index();
if ($index === null) {
@@ -373,10 +379,6 @@ function (array $subscriptions) use ($limit): ProcessedResult {
}
foreach ($subscriptions as $subscription) {
- if (!$subscription->isActive()) {
- continue;
- }
-
if ($subscription->position() >= $index) {
$this->logger?->debug(
sprintf(
@@ -397,9 +399,17 @@ function (array $subscriptions) use ($limit): ProcessedResult {
}
$errors[] = $error;
- }
- $messageCounter++;
+ $subscriptions->remove($subscription);
+
+ if (count($subscriptions) === 0) {
+ $this->logger?->info(
+ 'Subscription Engine: No subscriptions in booting status, finish booting.',
+ );
+
+ break 2;
+ }
+ }
$this->logger?->debug(sprintf(
'Subscription Engine: Current event stream position: %s',
@@ -427,6 +437,8 @@ function (array $subscriptions) use ($limit): ProcessedResult {
if ($error) {
$errors[] = $error;
+
+ $subscriptions->remove($subscription);
}
$this->subscriptionManager->update($subscription);
@@ -435,10 +447,6 @@ function (array $subscriptions) use ($limit): ProcessedResult {
}
foreach ($subscriptions as $subscription) {
- if (!$subscription->isActive()) {
- continue;
- }
-
if ($subscription->runMode() !== RunMode::Once) {
continue;
}
@@ -481,7 +489,7 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): Resu
groups: $criteria->groups,
status: [Status::Detached],
),
- function (array $subscriptions): Result {
+ function (SubscriptionCollection $subscriptions): Result {
/** @var list $errors */
$errors = [];
@@ -570,7 +578,7 @@ public function remove(SubscriptionEngineCriteria|null $criteria = null): Result
ids: $criteria->ids,
groups: $criteria->groups,
),
- function (array $subscriptions): Result {
+ function (SubscriptionCollection $subscriptions): Result {
/** @var list $errors */
$errors = [];
@@ -662,7 +670,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Re
Status::Finished,
],
),
- function (array $subscriptions): Result {
+ function (SubscriptionCollection $subscriptions): Result {
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
@@ -725,7 +733,7 @@ public function pause(SubscriptionEngineCriteria|null $criteria = null): Result
Status::Error,
],
),
- function (array $subscriptions): Result {
+ function (SubscriptionCollection $subscriptions): Result {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
@@ -868,7 +876,7 @@ private function markDetachedSubscriptions(SubscriptionEngineCriteria $criteria)
groups: $criteria->groups,
status: [Status::Active, Status::Paused, Status::Finished],
),
- function (array $subscriptions): void {
+ function (SubscriptionCollection $subscriptions): void {
foreach ($subscriptions as $subscription) {
$subscriber = $this->subscriber($subscription->id());
@@ -898,7 +906,7 @@ private function retrySubscriptions(SubscriptionEngineCriteria $criteria): void
groups: $criteria->groups,
status: [Status::Error],
),
- function (array $subscriptions): void {
+ function (SubscriptionCollection $subscriptions): void {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$error = $subscription->subscriptionError();
@@ -941,7 +949,7 @@ private function discoverNewSubscriptions(): void
{
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(),
- function (array $subscriptions): void {
+ function (SubscriptionCollection $subscriptions): void {
$latestIndex = null;
foreach ($this->subscriberRepository->all() as $subscriber) {
@@ -986,26 +994,6 @@ private function latestIndex(): int
return $stream->index() ?: 0;
}
- /** @param list $subscriptions */
- private function lowestSubscriptionPosition(array $subscriptions): int
- {
- $min = null;
-
- foreach ($subscriptions as $subscription) {
- if ($min !== null && $subscription->position() >= $min) {
- continue;
- }
-
- $min = $subscription->position();
- }
-
- if ($min === null) {
- return 0;
- }
-
- return $min;
- }
-
private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
diff --git a/src/Subscription/Engine/SubscriptionCollection.php b/src/Subscription/Engine/SubscriptionCollection.php
new file mode 100644
index 000000000..f1ed60b55
--- /dev/null
+++ b/src/Subscription/Engine/SubscriptionCollection.php
@@ -0,0 +1,67 @@
+
+ */
+final class SubscriptionCollection implements IteratorAggregate, Countable
+{
+ /** @param list $subscriptions */
+ public function __construct(
+ private array $subscriptions = [],
+ ) {
+ }
+
+ /** @return Traversable */
+ public function getIterator(): Traversable
+ {
+ yield from $this->subscriptions;
+ }
+
+ public function remove(Subscription $subscription): void
+ {
+ $this->subscriptions = array_values(
+ array_filter(
+ $this->subscriptions,
+ static fn (Subscription $s) => $s !== $subscription,
+ ),
+ );
+ }
+
+ public function count(): int
+ {
+ return count($this->subscriptions);
+ }
+
+ public function lowestPosition(): int
+ {
+ $min = null;
+
+ foreach ($this->subscriptions as $subscription) {
+ if ($min !== null && $subscription->position() >= $min) {
+ continue;
+ }
+
+ $min = $subscription->position();
+ }
+
+ if ($min === null) {
+ return 0;
+ }
+
+ return $min;
+ }
+}
diff --git a/src/Subscription/Engine/SubscriptionManager.php b/src/Subscription/Engine/SubscriptionManager.php
index 209a578cf..96a36f9b9 100644
--- a/src/Subscription/Engine/SubscriptionManager.php
+++ b/src/Subscription/Engine/SubscriptionManager.php
@@ -32,7 +32,7 @@ public function __construct(
}
/**
- * @param Closure(list):T $closure
+ * @param Closure(SubscriptionCollection):T $closure
*
* @return T
*
@@ -42,7 +42,11 @@ public function findForUpdate(SubscriptionCriteria $criteria, Closure $closure):
{
if (!$this->subscriptionStore instanceof LockableSubscriptionStore) {
try {
- return $closure($this->subscriptionStore->find($criteria));
+ return $closure(
+ new SubscriptionCollection(
+ $this->subscriptionStore->find($criteria),
+ ),
+ );
} finally {
$this->flush();
}
@@ -52,7 +56,11 @@ public function findForUpdate(SubscriptionCriteria $criteria, Closure $closure):
/** @return T */
function () use ($closure, $criteria): mixed {
try {
- return $closure($this->subscriptionStore->find($criteria));
+ return $closure(
+ new SubscriptionCollection(
+ $this->subscriptionStore->find($criteria),
+ ),
+ );
} finally {
$this->flush();
}
diff --git a/tests/Unit/Subscription/Engine/SubscriptionManagerTest.php b/tests/Unit/Subscription/Engine/SubscriptionManagerTest.php
index 24c868afe..d779ded82 100644
--- a/tests/Unit/Subscription/Engine/SubscriptionManagerTest.php
+++ b/tests/Unit/Subscription/Engine/SubscriptionManagerTest.php
@@ -13,6 +13,8 @@
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
+use function iterator_to_array;
+
/** @covers \Patchlevel\EventSourcing\Subscription\Engine\SubscriptionManager */
final class SubscriptionManagerTest extends TestCase
{
@@ -107,7 +109,7 @@ public function testFind(): void
$manager = new SubscriptionManager($store->reveal());
$result = $manager->find($criteria);
- self::assertSame([$subscription], $result);
+ self::assertSame([$subscription], iterator_to_array($result));
}
public function testFindForUpdateWithoutLock(): void
@@ -126,7 +128,7 @@ public function testFindForUpdateWithoutLock(): void
return $subscriptions;
});
- self::assertSame([$subscription], $result);
+ self::assertSame([$subscription], iterator_to_array($result));
}
public function testFindForUpdateWithLock(): void
@@ -151,6 +153,6 @@ public function testFindForUpdateWithLock(): void
return $subscriptions;
});
- self::assertSame([$subscription], $result);
+ self::assertSame([$subscription], iterator_to_array($result));
}
}