Skip to content

Commit

Permalink
poc pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Mar 8, 2024
1 parent 5c5ec5a commit a5f3b96
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
],
"require": {
"php": "~8.1.0 || ~8.2.0 || ~8.3.0",
"ext-pdo": "*",
"doctrine/dbal": "^3.8.1|^4.0.0",
"doctrine/migrations": "^3.3.2",
"patchlevel/hydrator": "^1.2.0",
Expand All @@ -31,7 +32,7 @@
"psr/simple-cache": "^2.0.0|^3.0.0",
"ramsey/uuid": "^4.7",
"symfony/console": "^5.4.32|^6.4.1|^7.0.1",
"symfony/finder": "^5.4.27|^6.4.0|^7.0.0"
"symfony/finder": "^5.4.27|^6.4.0|^7.0.0",
},
"require-dev": {
"ext-pdo_sqlite": "~8.1.0 || ~8.2.0 || ~8.3.0",
Expand Down
4 changes: 3 additions & 1 deletion src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$index = $this->currentIndex();

$worker = DefaultWorker::create(
function () use ($console, &$index, $aggregate, $aggregateId): void {
function () use ($console, &$index, $aggregate, $aggregateId, $sleep): void {
$stream = $this->store->load(
new Criteria(
$aggregate,
Expand All @@ -81,6 +81,8 @@ function () use ($console, &$index, $aggregate, $aggregateId): void {
}

$stream->close();

$this->store->listen($sleep);
},
[],
);
Expand Down
58 changes: 58 additions & 0 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
Expand Down Expand Up @@ -327,4 +328,61 @@ private function getCustomHeaders(Message $message): array
),
);
}

public function listen(int $microseconds): void
{
if (!$this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform) {
return;
}

$this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName));

/** @var \PDO $nativeConnection */
$nativeConnection = $this->connection->getNativeConnection();

$notification = $nativeConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $microseconds);

return;

if (
// no notifications, or for another table or queue
(false === $notification || $notification['message'] !== $this->storeTableName)
) {
usleep($microseconds);
}
}

private function getTriggerSql(): array
{
$functionName = $this->createTriggerFunctionName();

return [
// create trigger function
sprintf(<<<'SQL'
CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('%2$s');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
SQL,
$functionName,
$this->storeTableName
),
// register trigger
sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName),
sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName),
];
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->storeTableName);

if (1 === \count($tableConfig)) {
return sprintf('notify_%1$s', $tableConfig[0]);
}

return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
}
}
12 changes: 12 additions & 0 deletions src/Store/SubscriptionStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

interface SubscriptionStore
{
public function setupSubscription(): void;

public function wait(): void;
}

0 comments on commit a5f3b96

Please sign in to comment.