Skip to content

Commit

Permalink
Merge pull request #525 from patchlevel/pubsub
Browse files Browse the repository at this point in the history
use for postgres pub/sub feature
  • Loading branch information
DavidBadura authored Mar 21, 2024
2 parents 3b6277f + 4b71275 commit a7ebc6e
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 12 deletions.
11 changes: 10 additions & 1 deletion baseline.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="5.22.2@d768d914152dbbf3486c36398802f74e80cfde48">
<files psalm-version="5.23.1@8471a896ccea3526b26d082f4461eeea467f10a4">
<file src="src/Aggregate/AggregateRootBehaviour.php">
<UnsafeInstantiation>
<code><![CDATA[new static()]]></code>
Expand All @@ -10,6 +10,15 @@
<code><![CDATA[$this->frozenDateTime->modify(sprintf('+%s seconds', $seconds))]]></code>
</PossiblyFalsePropertyAssignmentValue>
</file>
<file src="src/Console/Command/SubscriptionBootCommand.php">
<RedundantCondition>
<code><![CDATA[1]]></code>
</RedundantCondition>
<TypeDoesNotContainType>
<code><![CDATA[$finished]]></code>
<code><![CDATA[$finished]]></code>
</TypeDoesNotContainType>
</file>
<file src="src/Console/DoctrineHelper.php">
<ClassNotFinal>
<code><![CDATA[class DoctrineHelper]]></code>
Expand Down
31 changes: 26 additions & 5 deletions src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Closure;
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
Expand All @@ -20,6 +23,13 @@
)]
final class SubscriptionBootCommand extends SubscriptionCommand
{
public function __construct(
SubscriptionEngine $engine,
private readonly Store $store,
) {
parent::__construct($engine);
}

public function configure(): void
{
parent::configure();
Expand Down Expand Up @@ -76,6 +86,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria);

if ($this->store instanceof SubscriptionStore) {
$this->store->setupSubscription();
}

if ($setup) {
$this->engine->setup($criteria);
}
Expand All @@ -85,15 +99,21 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$finished = false;

$worker = DefaultWorker::create(
function (Closure $stop) use ($criteria, $messageLimit, &$finished): void {
function (Closure $stop) use ($criteria, $messageLimit, &$finished, $sleep): void {
$this->engine->boot($criteria, $messageLimit);

if (!$this->isBootingFinished($criteria)) {
if ($this->isBootingFinished($criteria)) {
$finished = true;
$stop();

return;
}

if (!$this->store instanceof SubscriptionStore) {
return;
}

$finished = true;
$stop();
$this->store->wait($sleep);
},
[
'runLimit' => $runLimit,
Expand All @@ -103,7 +123,8 @@ function (Closure $stop) use ($criteria, $messageLimit, &$finished): void {
$logger,
);

$worker->run($sleep);
$supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription();
$worker->run($supportSubscription ? 0 : $sleep);

return $finished ? 0 : 1;
}
Expand Down
25 changes: 23 additions & 2 deletions src/Console/Command/SubscriptionRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -18,6 +21,13 @@
)]
final class SubscriptionRunCommand extends SubscriptionCommand
{
public function __construct(
SubscriptionEngine $engine,
private readonly Store $store,
) {
parent::__construct($engine);
}

protected function configure(): void
{
parent::configure();
Expand Down Expand Up @@ -75,11 +85,21 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria);

if ($this->store instanceof SubscriptionStore) {
$this->store->setupSubscription();
}

$logger = new ConsoleLogger($output);

$worker = DefaultWorker::create(
function () use ($criteria, $messageLimit): void {
function () use ($criteria, $messageLimit, $sleep): void {
$this->engine->run($criteria, $messageLimit);

if (!$this->store instanceof SubscriptionStore) {
return;
}

$this->store->wait($sleep);
},
[
'runLimit' => $runLimit,
Expand All @@ -94,7 +114,8 @@ function () use ($criteria, $messageLimit): void {
$this->engine->boot($criteria);
}

$worker->run($sleep);
$supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription();
$worker->run($supportSubscription ? 0 : $sleep);

return 0;
}
Expand Down
17 changes: 14 additions & 3 deletions src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
Expand Down Expand Up @@ -65,8 +66,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$index = $this->currentIndex();

if ($this->store instanceof SubscriptionStore) {
$this->store->setupSubscription();
}

$worker = DefaultWorker::create(
function () use ($console, &$index, $aggregate, $aggregateId): void {
function () use ($console, &$index, $aggregate, $aggregateId, $sleep): void {
$stream = $this->store->load(
new Criteria(
$aggregate,
Expand All @@ -81,11 +86,17 @@ function () use ($console, &$index, $aggregate, $aggregateId): void {
}

$stream->close();

if (!$this->store instanceof SubscriptionStore) {
return;
}

$this->store->wait($sleep);
},
[],
);

$worker->run($sleep);
$supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription();
$worker->run($supportSubscription ? 0 : $sleep);

return 0;
}
Expand Down
61 changes: 60 additions & 1 deletion src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
Expand All @@ -17,19 +18,22 @@
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use PDO;

use function array_fill;
use function array_filter;
use function array_values;
use function class_exists;
use function count;
use function explode;
use function floor;
use function implode;
use function in_array;
use function is_int;
use function is_string;
use function sprintf;

final class DoctrineDbalStore implements Store, ArchivableStore, DoctrineSchemaConfigurator
final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator
{
/**
* PostgreSQL has a limit of 65535 parameters in a single query.
Expand Down Expand Up @@ -327,4 +331,59 @@ private function getCustomHeaders(Message $message): array
),
);
}

public function supportSubscription(): bool
{
return $this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform && class_exists(PDO::class);
}

public function wait(int $timeoutMilliseconds): void
{
if (!$this->supportSubscription()) {
return;
}

$this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName));

/** @var PDO $nativeConnection */
$nativeConnection = $this->connection->getNativeConnection();

$nativeConnection->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds);
}

public function setupSubscription(): void
{
if (!$this->supportSubscription()) {
return;
}

$functionName = $this->createTriggerFunctionName();

$this->connection->executeStatement(sprintf(
<<<'SQL'
CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('%2$s');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
SQL,
$functionName,
$this->storeTableName,
));

$this->connection->executeStatement(sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName));
$this->connection->executeStatement(sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName));
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->storeTableName);

if (count($tableConfig) === 1) {
return sprintf('notify_%1$s', $tableConfig[0]);
}

return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
}
}
14 changes: 14 additions & 0 deletions src/Store/SubscriptionStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

interface SubscriptionStore
{
public function supportSubscription(): bool;

public function setupSubscription(): void;

public function wait(int $timeoutMilliseconds): void;
}

0 comments on commit a7ebc6e

Please sign in to comment.