From 0080773a2c56d6213300d5127832f1e59686b7d0 Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 8 Mar 2024 10:25:33 +0100 Subject: [PATCH] poc pubsub --- src/Console/Command/WatchCommand.php | 17 ++++++-- src/Store/DoctrineDbalStore.php | 61 +++++++++++++++++++++++++++- src/Store/SubscriptionStore.php | 14 +++++++ 3 files changed, 88 insertions(+), 4 deletions(-) create mode 100644 src/Store/SubscriptionStore.php diff --git a/src/Console/Command/WatchCommand.php b/src/Console/Command/WatchCommand.php index 2ad253814..f08d21a1e 100644 --- a/src/Console/Command/WatchCommand.php +++ b/src/Console/Command/WatchCommand.php @@ -10,6 +10,7 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria; use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\SubscriptionStore; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; @@ -65,8 +66,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int $index = $this->currentIndex(); + if ($this->store instanceof SubscriptionStore) { + $this->store->setupSubscription(); + } + $worker = DefaultWorker::create( - function () use ($console, &$index, $aggregate, $aggregateId): void { + function () use ($console, &$index, $aggregate, $aggregateId, $sleep): void { $stream = $this->store->load( new Criteria( $aggregate, @@ -81,11 +86,17 @@ function () use ($console, &$index, $aggregate, $aggregateId): void { } $stream->close(); + + if (!$this->store instanceof SubscriptionStore) { + return; + } + + $this->store->wait($sleep); }, - [], ); - $worker->run($sleep); + $supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription(); + $worker->run($supportSubscription ? 0 : $sleep); return 0; } diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index c22c80f14..7c9c909a9 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -7,6 +7,7 @@ use Closure; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; +use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Type; @@ -17,11 +18,14 @@ use Patchlevel\EventSourcing\EventBus\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Serializer\EventSerializer; +use PDO; use function array_fill; use function array_filter; use function array_values; +use function class_exists; use function count; +use function explode; use function floor; use function implode; use function in_array; @@ -29,7 +33,7 @@ use function is_string; use function sprintf; -final class DoctrineDbalStore implements Store, ArchivableStore, DoctrineSchemaConfigurator +final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator { /** * PostgreSQL has a limit of 65535 parameters in a single query. @@ -327,4 +331,59 @@ private function getCustomHeaders(Message $message): array ), ); } + + public function supportSubscription(): bool + { + return $this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform && class_exists(PDO::class); + } + + public function wait(int $timeoutMilliseconds): void + { + if (!$this->supportSubscription()) { + return; + } + + $this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName)); + + /** @var PDO $nativeConnection */ + $nativeConnection = $this->connection->getNativeConnection(); + + $nativeConnection->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds); + } + + public function setupSubscription(): void + { + if (!$this->supportSubscription()) { + return; + } + + $functionName = $this->createTriggerFunctionName(); + + $this->connection->executeStatement(sprintf( + <<<'SQL' + CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('%2$s'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + $functionName, + $this->storeTableName, + )); + + $this->connection->executeStatement(sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName)); + $this->connection->executeStatement(sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName)); + } + + private function createTriggerFunctionName(): string + { + $tableConfig = explode('.', $this->storeTableName); + + if (count($tableConfig) === 1) { + return sprintf('notify_%1$s', $tableConfig[0]); + } + + return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); + } } diff --git a/src/Store/SubscriptionStore.php b/src/Store/SubscriptionStore.php new file mode 100644 index 000000000..b99ab0116 --- /dev/null +++ b/src/Store/SubscriptionStore.php @@ -0,0 +1,14 @@ +