Skip to content

Commit

Permalink
dont lock by discoverNewSubscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 10, 2024
1 parent 47e202f commit b268fcf
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 33 deletions.
63 changes: 31 additions & 32 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -939,44 +939,43 @@ function (array $subscriptions): void {

private function discoverNewSubscriptions(): void
{
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(),
function (array $subscriptions): void {
$latestIndex = null;
$subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria());

foreach ($this->subscriberRepository->all() as $subscriber) {
foreach ($subscriptions as $subscription) {
if ($subscription->id() === $subscriber->id()) {
continue 2;
}
}
$latestIndex = null;

$subscription = new Subscription(
$subscriber->id(),
$subscriber->group(),
$subscriber->runMode(),
);
foreach ($this->subscriberRepository->all() as $subscriber) {
foreach ($subscriptions as $subscription) {
if ($subscription->id() === $subscriber->id()) {
continue 2;
}
}

if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) {
if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
}
$subscription = new Subscription(
$subscriber->id(),
$subscriber->group(),
$subscriber->runMode(),
);

$subscription->changePosition($latestIndex);
$subscription->active();
}
if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) {
if ($latestIndex === null) {
$latestIndex = $this->latestIndex();
}

$subscription->changePosition($latestIndex);
$subscription->active();
}

$this->subscriptionManager->add($subscription);
$this->subscriptionManager->add($subscription);

$this->logger?->info(
sprintf(
'Subscription Engine: New Subscriber "%s" was found and added to the subscription store.',
$subscriber->id(),
),
);
}
},
);
$this->logger?->info(
sprintf(
'Subscription Engine: New Subscriber "%s" was found and added to the subscription store.',
$subscriber->id(),
),
);
}

$this->subscriptionManager->flush();
}

private function latestIndex(): int
Expand Down
36 changes: 35 additions & 1 deletion tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3164,6 +3164,41 @@ class {
$engine->{$method}();
}

public function testDontLockGetSubscriptions(): void
{
$subscriber = new #[Subscriber('id1', RunMode::FromNow)]
class {
};

$subscriptionStore = $this->prophesize(LockableSubscriptionStore::class);
$subscriptionStore->inLock(Argument::type(Closure::class))->will(
/** @param array{Closure} $args */
static fn (array $args): mixed => $args[0](),
)->shouldNotBeCalled();
$subscriptionStore->find(Argument::any())->willReturn([])->shouldBeCalled();

$subscriptionStore->find(
new SubscriptionCriteria(),
)->willReturn([
new Subscription('id1'),
])->shouldBeCalled();

$subscriptionStore->remove(Argument::type(Subscription::class));
$subscriptionStore->add(Argument::type(Subscription::class));

$streamableStore = $this->prophesize(Store::class);
$streamableStore->load($this->criteria())->willReturn(new ArrayStream([]));

$engine = new DefaultSubscriptionEngine(
$streamableStore->reveal(),
$subscriptionStore->reveal(),
new MetadataSubscriberAccessorRepository([$subscriber]),
logger: new NullLogger(),
);

$engine->subscriptions();
}

public function testFromNowWithoutSetupDirectActive(): void
{
$subscriptionId = 'test';
Expand Down Expand Up @@ -3208,7 +3243,6 @@ public static function methodProvider(): Generator
yield 'teardown' => ['teardown'];
yield 'remove' => ['remove'];
yield 'reactivate' => ['reactivate'];
yield 'subscriptions' => ['subscriptions'];
}

private function criteria(int $fromIndex = 0): Criteria
Expand Down

0 comments on commit b268fcf

Please sign in to comment.