diff --git a/src/Console/Command/SubscriptionBootCommand.php b/src/Console/Command/SubscriptionBootCommand.php index 6f53cbe9e..7c0ad215c 100644 --- a/src/Console/Command/SubscriptionBootCommand.php +++ b/src/Console/Command/SubscriptionBootCommand.php @@ -6,6 +6,9 @@ use Closure; use Patchlevel\EventSourcing\Console\InputHelper; +use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\SubscriptionStore; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; @@ -20,6 +23,13 @@ )] final class SubscriptionBootCommand extends SubscriptionCommand { + public function __construct( + SubscriptionEngine $engine, + private readonly Store $store, + ) { + parent::__construct($engine); + } + public function configure(): void { parent::configure(); @@ -76,6 +86,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int $criteria = $this->subscriptionEngineCriteria($input); $criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria); + if ($this->store instanceof SubscriptionStore) { + $this->store->setupSubscription(); + } + if ($setup) { $this->engine->setup($criteria); } @@ -85,15 +99,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int $finished = false; $worker = DefaultWorker::create( - function (Closure $stop) use ($criteria, $messageLimit, &$finished): void { + function (Closure $stop) use ($criteria, $messageLimit, &$finished, $sleep): void { $this->engine->boot($criteria, $messageLimit); - if (!$this->isBootingFinished($criteria)) { + if ($this->isBootingFinished($criteria)) { + $finished = true; + $stop(); + } + + if (!$this->store instanceof SubscriptionStore) { return; } - $finished = true; - $stop(); + $this->store->wait($sleep); }, [ 'runLimit' => $runLimit, @@ -103,7 +121,8 @@ function (Closure $stop) use ($criteria, $messageLimit, &$finished): void { $logger, ); - $worker->run($sleep); + $supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription(); + $worker->run($supportSubscription ? 0 : $sleep); return $finished ? 0 : 1; } diff --git a/src/Console/Command/SubscriptionRunCommand.php b/src/Console/Command/SubscriptionRunCommand.php index 40d1b8fcd..87548d907 100644 --- a/src/Console/Command/SubscriptionRunCommand.php +++ b/src/Console/Command/SubscriptionRunCommand.php @@ -5,6 +5,9 @@ namespace Patchlevel\EventSourcing\Console\Command; use Patchlevel\EventSourcing\Console\InputHelper; +use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\SubscriptionStore; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; @@ -18,6 +21,13 @@ )] final class SubscriptionRunCommand extends SubscriptionCommand { + public function __construct( + SubscriptionEngine $engine, + private readonly Store $store, + ) { + parent::__construct($engine); + } + protected function configure(): void { parent::configure(); @@ -75,11 +85,21 @@ protected function execute(InputInterface $input, OutputInterface $output): int $criteria = $this->subscriptionEngineCriteria($input); $criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria); + if ($this->store instanceof SubscriptionStore) { + $this->store->setupSubscription(); + } + $logger = new ConsoleLogger($output); $worker = DefaultWorker::create( - function () use ($criteria, $messageLimit): void { + function () use ($criteria, $messageLimit, $sleep): void { $this->engine->run($criteria, $messageLimit); + + if (!$this->store instanceof SubscriptionStore) { + return; + } + + $this->store->wait($sleep); }, [ 'runLimit' => $runLimit, @@ -94,7 +114,8 @@ function () use ($criteria, $messageLimit): void { $this->engine->boot($criteria); } - $worker->run($sleep); + $supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription(); + $worker->run($supportSubscription ? 0 : $sleep); return 0; } diff --git a/src/Console/Command/WatchCommand.php b/src/Console/Command/WatchCommand.php index 2ad253814..f08d21a1e 100644 --- a/src/Console/Command/WatchCommand.php +++ b/src/Console/Command/WatchCommand.php @@ -10,6 +10,7 @@ use Patchlevel\EventSourcing\Serializer\EventSerializer; use Patchlevel\EventSourcing\Store\Criteria; use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Store\SubscriptionStore; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; @@ -65,8 +66,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int $index = $this->currentIndex(); + if ($this->store instanceof SubscriptionStore) { + $this->store->setupSubscription(); + } + $worker = DefaultWorker::create( - function () use ($console, &$index, $aggregate, $aggregateId): void { + function () use ($console, &$index, $aggregate, $aggregateId, $sleep): void { $stream = $this->store->load( new Criteria( $aggregate, @@ -81,11 +86,17 @@ function () use ($console, &$index, $aggregate, $aggregateId): void { } $stream->close(); + + if (!$this->store instanceof SubscriptionStore) { + return; + } + + $this->store->wait($sleep); }, - [], ); - $worker->run($sleep); + $supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription(); + $worker->run($supportSubscription ? 0 : $sleep); return 0; } diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index c22c80f14..7c9c909a9 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -7,6 +7,7 @@ use Closure; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; +use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Type; @@ -17,11 +18,14 @@ use Patchlevel\EventSourcing\EventBus\Serializer\HeadersSerializer; use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Serializer\EventSerializer; +use PDO; use function array_fill; use function array_filter; use function array_values; +use function class_exists; use function count; +use function explode; use function floor; use function implode; use function in_array; @@ -29,7 +33,7 @@ use function is_string; use function sprintf; -final class DoctrineDbalStore implements Store, ArchivableStore, DoctrineSchemaConfigurator +final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator { /** * PostgreSQL has a limit of 65535 parameters in a single query. @@ -327,4 +331,59 @@ private function getCustomHeaders(Message $message): array ), ); } + + public function supportSubscription(): bool + { + return $this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform && class_exists(PDO::class); + } + + public function wait(int $timeoutMilliseconds): void + { + if (!$this->supportSubscription()) { + return; + } + + $this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName)); + + /** @var PDO $nativeConnection */ + $nativeConnection = $this->connection->getNativeConnection(); + + $nativeConnection->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds); + } + + public function setupSubscription(): void + { + if (!$this->supportSubscription()) { + return; + } + + $functionName = $this->createTriggerFunctionName(); + + $this->connection->executeStatement(sprintf( + <<<'SQL' + CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('%2$s'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + $functionName, + $this->storeTableName, + )); + + $this->connection->executeStatement(sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName)); + $this->connection->executeStatement(sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName)); + } + + private function createTriggerFunctionName(): string + { + $tableConfig = explode('.', $this->storeTableName); + + if (count($tableConfig) === 1) { + return sprintf('notify_%1$s', $tableConfig[0]); + } + + return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); + } } diff --git a/src/Store/SubscriptionStore.php b/src/Store/SubscriptionStore.php new file mode 100644 index 000000000..b99ab0116 --- /dev/null +++ b/src/Store/SubscriptionStore.php @@ -0,0 +1,14 @@ +