Skip to content

Commit

Permalink
poc pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Mar 12, 2024
1 parent bbe507a commit 0b17c0a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 4 deletions.
16 changes: 13 additions & 3 deletions src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
Expand Down Expand Up @@ -65,8 +66,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$index = $this->currentIndex();

$supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription();

if ($supportSubscription) {
$this->store->setupSubscription();

Check failure on line 72 in src/Console/Command/WatchCommand.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Call to an undefined method Patchlevel\EventSourcing\Store\Store::setupSubscription().
}

$worker = DefaultWorker::create(
function () use ($console, &$index, $aggregate, $aggregateId): void {
function () use ($console, &$index, $aggregate, $aggregateId, $supportSubscription, $sleep): void {
$stream = $this->store->load(
new Criteria(
$aggregate,
Expand All @@ -81,11 +88,14 @@ function () use ($console, &$index, $aggregate, $aggregateId): void {
}

$stream->close();

if ($supportSubscription) {
$this->store->wait($sleep);

Check failure on line 93 in src/Console/Command/WatchCommand.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Call to an undefined method Patchlevel\EventSourcing\Store\Store::wait().

Check failure on line 93 in src/Console/Command/WatchCommand.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedInterfaceMethod

src/Console/Command/WatchCommand.php:93:35: UndefinedInterfaceMethod: Method Patchlevel\EventSourcing\Store\Store::wait does not exist (see https://psalm.dev/181)
}
},
[],
);

$worker->run($sleep);
$worker->run($supportSubscription ? 0 : $sleep);

return 0;
}
Expand Down
58 changes: 57 additions & 1 deletion src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
Expand All @@ -18,6 +19,7 @@
use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;

use PDO;
use function array_fill;
use function array_filter;
use function array_values;
Expand All @@ -29,7 +31,7 @@
use function is_string;
use function sprintf;

final class DoctrineDbalStore implements Store, ArchivableStore, DoctrineSchemaConfigurator
final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator
{
/**
* PostgreSQL has a limit of 65535 parameters in a single query.
Expand Down Expand Up @@ -327,4 +329,58 @@ private function getCustomHeaders(Message $message): array
),
);
}

public function supportSubscription(): bool
{
return $this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform && class_exists(PDO::class);
}

public function wait(int $timeoutMilliseconds): void
{
if (!$this->supportSubscription()) {
return;
}

$this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName));

/** @var PDO $nativeConnection */
$nativeConnection = $this->connection->getNativeConnection();

$nativeConnection->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds);
}

public function setupSubscription(): void
{
if (!$this->supportSubscription()) {
return;
}

$functionName = $this->createTriggerFunctionName();

$this->connection->executeStatement(sprintf(<<<'SQL'
CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('%2$s');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
SQL,
$functionName,
$this->storeTableName
));

$this->connection->executeStatement(sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName));
$this->connection->executeStatement(sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName));
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->storeTableName);

if (1 === \count($tableConfig)) {
return sprintf('notify_%1$s', $tableConfig[0]);
}

return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
}
}
14 changes: 14 additions & 0 deletions src/Store/SubscriptionStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

interface SubscriptionStore
{
public function supportSubscription(): bool;

public function setupSubscription(): void;

public function wait(int $timeoutMilliseconds): void;
}

0 comments on commit 0b17c0a

Please sign in to comment.