diff --git a/composer.json b/composer.json index 55192fee7..cda929654 100644 --- a/composer.json +++ b/composer.json @@ -20,6 +20,7 @@ ], "require": { "php": "~8.1.0 || ~8.2.0 || ~8.3.0", + "ext-pdo": "*", "doctrine/dbal": "^3.8.1|^4.0.0", "doctrine/migrations": "^3.3.2", "patchlevel/hydrator": "^1.2.0", diff --git a/src/Console/Command/WatchCommand.php b/src/Console/Command/WatchCommand.php index 2ad253814..3e0ca5f65 100644 --- a/src/Console/Command/WatchCommand.php +++ b/src/Console/Command/WatchCommand.php @@ -66,7 +66,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $index = $this->currentIndex(); $worker = DefaultWorker::create( - function () use ($console, &$index, $aggregate, $aggregateId): void { + function () use ($console, &$index, $aggregate, $aggregateId, $sleep): void { $stream = $this->store->load( new Criteria( $aggregate, @@ -81,6 +81,8 @@ function () use ($console, &$index, $aggregate, $aggregateId): void { } $stream->close(); + + $this->store->listen($sleep); }, [], ); diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 6e49134b7..024cb61be 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -7,6 +7,7 @@ use Closure; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; +use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Type; @@ -327,4 +328,61 @@ private function getCustomHeaders(Message $message): array ), ); } + + public function listen(int $microseconds): void + { + if (!$this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform) { + return; + } + + $this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName)); + + /** @var \PDO $nativeConnection */ + $nativeConnection = $this->connection->getNativeConnection(); + + $notification = $nativeConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $microseconds); + + return; + + if ( + // no notifications, or for another table or queue + (false === $notification || $notification['message'] !== $this->storeTableName) + ) { + usleep($microseconds); + } + } + + private function getTriggerSql(): array + { + $functionName = $this->createTriggerFunctionName(); + + return [ + // create trigger function + sprintf(<<<'SQL' + CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('%2$s'); + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + SQL, + $functionName, + $this->storeTableName + ), + // register trigger + sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName), + sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName), + ]; + } + + private function createTriggerFunctionName(): string + { + $tableConfig = explode('.', $this->storeTableName); + + if (1 === \count($tableConfig)) { + return sprintf('notify_%1$s', $tableConfig[0]); + } + + return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]); + } } diff --git a/src/Store/SubscriptionStore.php b/src/Store/SubscriptionStore.php new file mode 100644 index 000000000..a888c7b3d --- /dev/null +++ b/src/Store/SubscriptionStore.php @@ -0,0 +1,12 @@ +