Skip to content

Commit

Permalink
poc pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Mar 13, 2024
1 parent 1636023 commit 68e757b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 4 deletions.
17 changes: 14 additions & 3 deletions src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
Expand Down Expand Up @@ -65,8 +66,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$index = $this->currentIndex();

if ($this->store instanceof SubscriptionStore) {
$this->store->setupSubscription();
}

$worker = DefaultWorker::create(
function () use ($console, &$index, $aggregate, $aggregateId): void {
function () use ($console, &$index, $aggregate, $aggregateId, $sleep): void {
$stream = $this->store->load(
new Criteria(
$aggregate,
Expand All @@ -81,11 +86,17 @@ function () use ($console, &$index, $aggregate, $aggregateId): void {
}

$stream->close();

if (!$this->store instanceof SubscriptionStore) {
return;
}

$this->store->wait($sleep);
},
[],
);

$worker->run($sleep);
$supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription();
$worker->run($supportSubscription ? 0 : $sleep);

return 0;
}
Expand Down
61 changes: 60 additions & 1 deletion src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
Expand All @@ -17,19 +18,22 @@
use Patchlevel\EventSourcing\EventBus\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use PDO;

use function array_fill;
use function array_filter;
use function array_values;
use function class_exists;
use function count;
use function explode;
use function floor;
use function implode;
use function in_array;
use function is_int;
use function is_string;
use function sprintf;

final class DoctrineDbalStore implements Store, ArchivableStore, DoctrineSchemaConfigurator
final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator
{
/**
* PostgreSQL has a limit of 65535 parameters in a single query.
Expand Down Expand Up @@ -327,4 +331,59 @@ private function getCustomHeaders(Message $message): array
),
);
}

public function supportSubscription(): bool
{
return $this->connection->getDatabasePlatform() instanceof PostgreSQLPlatform && class_exists(PDO::class);
}

public function wait(int $timeoutMilliseconds): void
{
if (!$this->supportSubscription()) {
return;
}

$this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName));

/** @var PDO $nativeConnection */
$nativeConnection = $this->connection->getNativeConnection();

$nativeConnection->pgsqlGetNotify(PDO::FETCH_ASSOC, $timeoutMilliseconds);
}

public function setupSubscription(): void
{
if (!$this->supportSubscription()) {
return;
}

$functionName = $this->createTriggerFunctionName();

$this->connection->executeStatement(sprintf(
<<<'SQL'
CREATE OR REPLACE FUNCTION %1$s() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('%2$s');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
SQL,
$functionName,
$this->storeTableName,
));

$this->connection->executeStatement(sprintf('DROP TRIGGER IF EXISTS notify_trigger ON %s;', $this->storeTableName));
$this->connection->executeStatement(sprintf('CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();', $this->storeTableName, $functionName));
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->storeTableName);

if (count($tableConfig) === 1) {
return sprintf('notify_%1$s', $tableConfig[0]);
}

return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
}
}
14 changes: 14 additions & 0 deletions src/Store/SubscriptionStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

interface SubscriptionStore
{
public function supportSubscription(): bool;

public function setupSubscription(): void;

public function wait(int $timeoutMilliseconds): void;
}

0 comments on commit 68e757b

Please sign in to comment.