From 4ba3a3a23550f4c3e26e6f86b29331880bb9b22a Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 23 Feb 2024 15:41:36 +0100 Subject: [PATCH] remove watch server & use event store for watching --- README.md | 1 - docs/mkdocs.yml | 2 - docs/pages/cli.md | 8 ++ docs/pages/index.md | 1 - docs/pages/watch_server.md | 75 ---------- phpstan-baseline.neon | 5 - src/Console/Command/WatchCommand.php | 79 +++++++++-- src/WatchServer/SendingFailed.php | 11 -- src/WatchServer/SocketWatchServer.php | 130 ------------------ src/WatchServer/SocketWatchServerClient.php | 128 ----------------- src/WatchServer/WatchEventBus.php | 27 ---- src/WatchServer/WatchServer.php | 18 --- src/WatchServer/WatchServerClient.php | 12 -- .../Unit/Console/Command/WatchCommandTest.php | 47 ------- tests/Unit/WatchServer/WatchEventBusTest.php | 42 ------ 15 files changed, 76 insertions(+), 510 deletions(-) delete mode 100644 docs/pages/watch_server.md delete mode 100644 src/WatchServer/SendingFailed.php delete mode 100644 src/WatchServer/SocketWatchServer.php delete mode 100644 src/WatchServer/SocketWatchServerClient.php delete mode 100644 src/WatchServer/WatchEventBus.php delete mode 100644 src/WatchServer/WatchServer.php delete mode 100644 src/WatchServer/WatchServerClient.php delete mode 100644 tests/Unit/Console/Command/WatchCommandTest.php delete mode 100644 tests/Unit/WatchServer/WatchEventBusTest.php diff --git a/README.md b/README.md index afab71fdb..e45d5a3dc 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,6 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve * Versioned and managed lifecycle of [projections](https://patchlevel.github.io/event-sourcing-docs/latest/projection/) * Smooth [upcasting](https://patchlevel.github.io/event-sourcing-docs/latest/upcasting/) of old events * Simple setup with [scheme management](https://patchlevel.github.io/event-sourcing-docs/latest/store/) and [doctrine migration](https://patchlevel.github.io/event-sourcing-docs/latest/migration/) -* Dev [tools](https://patchlevel.github.io/event-sourcing-docs/latest/watch_server/) like realtime event watcher * Built in [cli commands](https://patchlevel.github.io/event-sourcing-docs/latest/cli/) with [symfony](https://symfony.com/) * and much more... diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 2633c76b1..20e39c784 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -96,6 +96,4 @@ nav: - Split Stream: split_stream.md - Time / Clock: clock.md - Testing: testing.md - - Other / Tools: - CLI: cli.md - - Watch Server: watch_server.md diff --git a/docs/pages/cli.md b/docs/pages/cli.md index 2de5f0d5d..6fe666ec2 100644 --- a/docs/pages/cli.md +++ b/docs/pages/cli.md @@ -55,6 +55,14 @@ Interacting with the outbox store is also possible via the cli. You can find out more about outbox [here](outbox.md). +## Inspector commands + +The inspector is a tool to inspect the event streams. + +* ShowCommand: `event-sourcing:show` +* ShowAggregateCommand: `event-sourcing:show-aggregate` +* WatchCommand: `event-sourcing:watch` + ## CLI example A cli php file can look like this: diff --git a/docs/pages/index.md b/docs/pages/index.md index f377acc5a..a0c68346b 100644 --- a/docs/pages/index.md +++ b/docs/pages/index.md @@ -13,7 +13,6 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve * Versioned and managed lifecycle of [projections](projection.md) * Smooth [upcasting](upcasting.md) of old events * Simple setup with [scheme management](store.md) and [doctrine migration](migration.md) -* Dev [tools](watch_server.md) like realtime event watcher * Built in [cli commands](cli.md) with [symfony](https://symfony.com/) * and much more... diff --git a/docs/pages/watch_server.md b/docs/pages/watch_server.md deleted file mode 100644 index 1472dfba0..000000000 --- a/docs/pages/watch_server.md +++ /dev/null @@ -1,75 +0,0 @@ -# Watch - -We have implemented a watch server that can be used for development. -Every event that is saved is sent to the watch server using a watch listener. -You can subscribe to it and display the information anywhere, e.g. in the CLI. - -## Watch client - -The watch client and the listener are used to send all events that are saved to a specific host. - -```php -use Patchlevel\EventSourcing\WatchServer\SocketWatchServerClient; -use Patchlevel\EventSourcing\WatchServer\WatchListener; - -$watchServerClient = new SocketWatchServerClient('127.0.0.1:5000', $eventSerializer); -$watchListener = new WatchListener($watchServerClient); -``` - -!!! warning - - This should only be used for dev purposes and should not be registered in production. - -## Watch server - -The watch server is used to receive all events that are sent. -You can subscribe to the watch server and process or display each event as you wish. -As soon as you execute `start`, the server will be started until you terminate the php process. - -```php -use Patchlevel\EventSourcing\EventBus\Message; -use Patchlevel\EventSourcing\WatchServer\SocketWatchServer; - -$watchServer = new SocketWatchServer('127.0.0.1:5000', $eventSerializer); -$watchServer->listen( - function (Message $message) { - var_dump($message); - } -); -$watchServer->start(); -``` - -!!! warning - - The host must match the one defined in the watch server client. - -## Watch server CLI - -Here is an example of how to use it with a symfony cli. - -```php -use Patchlevel\EventSourcing\Console\Command; -use Patchlevel\EventSourcing\WatchServer\SocketWatchServer; -use Symfony\Component\Console\Application; - -$cli = new Application('Event-Sourcing CLI'); -$cli->setCatchExceptions(true); - -$watchServer = new SocketWatchServer('127.0.0.1:5000', $eventSerializer); -$command = new WatchCommand($watchServer); - -$cli->addCommands([ - /* more commands */ - new Command\WatchCommand($watchServer), -]); - -$cli->run(); -``` - -!!! warning - - To use the watch server CLI command, you have to configure the [CLI](cli.md) beforehand. - -!!! note - - The command can be terminated with `ctrl+c` or `control+c`. diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 8d4101dc7..7745313a6 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -39,8 +39,3 @@ parameters: message: "#^Ternary operator condition is always true\\.$#" count: 1 path: src/Store/DoctrineDbalStoreStream.php - - - - message: "#^While loop condition is always true\\.$#" - count: 1 - path: src/WatchServer/SocketWatchServer.php diff --git a/src/Console/Command/WatchCommand.php b/src/Console/Command/WatchCommand.php index fbd1510f8..3f323af5e 100644 --- a/src/Console/Command/WatchCommand.php +++ b/src/Console/Command/WatchCommand.php @@ -4,17 +4,18 @@ namespace Patchlevel\EventSourcing\Console\Command; +use Patchlevel\EventSourcing\Console\InputHelper; use Patchlevel\EventSourcing\Console\OutputStyle; -use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Serializer\EventSerializer; -use Patchlevel\EventSourcing\WatchServer\WatchServer; +use Patchlevel\EventSourcing\Store\Criteria; +use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use function sprintf; - #[AsCommand( 'event-sourcing:watch', 'live stream of all aggregate events', @@ -22,27 +23,83 @@ final class WatchCommand extends Command { public function __construct( - private readonly WatchServer $server, + private readonly Store $store, private readonly EventSerializer $serializer, ) { parent::__construct(); } + protected function configure(): void + { + $this + ->addOption( + 'sleep', + null, + InputOption::VALUE_REQUIRED, + 'How much time should elapse before the next job is executed in milliseconds', + 1000, + ) + ->addOption( + 'aggregate', + null, + InputOption::VALUE_REQUIRED, + 'filter aggregate name', + ) + ->addOption( + 'aggregate-id', + null, + InputOption::VALUE_REQUIRED, + 'filter aggregate id', + ); + } + protected function execute(InputInterface $input, OutputInterface $output): int { $console = new OutputStyle($input, $output); - $this->server->start(); + $sleep = InputHelper::positiveIntOrZero($input->getOption('sleep')); + $aggregate = InputHelper::nullableString($input->getOption('aggregate')); + $aggregateId = InputHelper::nullableString($input->getOption('aggregate-id')); - $console->success(sprintf('Server listening on %s', $this->server->host())); - $console->comment('Quit the server with CONTROL-C.'); + $index = $this->currentIndex(); - $this->server->listen( - function (Message $message) use ($console): void { - $console->message($this->serializer, $message); + $worker = DefaultWorker::create( + function () use ($console, &$index, $aggregate, $aggregateId): void { + $stream = $this->store->load( + new Criteria( + $aggregate, + $aggregateId, + $index, + ), + ); + + foreach ($stream as $message) { + $console->message($this->serializer, $message); + } + + $index = $stream->index(); + + $stream->close(); }, + [], ); + $worker->run($sleep); + return 0; } + + private function currentIndex(): int + { + $stream = $this->store->load( + limit: 1, + backwards: true, + ); + + $index = $stream->index() ?? 0; + + $stream->close(); + + return $index; + } } diff --git a/src/WatchServer/SendingFailed.php b/src/WatchServer/SendingFailed.php deleted file mode 100644 index 4465e23fd..000000000 --- a/src/WatchServer/SendingFailed.php +++ /dev/null @@ -1,11 +0,0 @@ -host = $host; - $this->logger = $logger ?? new NullLogger(); - $this->socket = null; - } - - public function start(): void - { - if ($this->socket) { - return; - } - - $socket = stream_socket_server($this->host, $errno, $errstr); - - if ($socket === false) { - throw new RuntimeException(sprintf('Server start failed on "%s": ', $this->host) . $errstr . ' ' . $errno); - } - - $this->socket = $socket; - } - - public function listen(Closure $callback): void - { - $socket = $this->socket(); - - foreach ($this->messages($socket) as $clientId => $clientMessage) { - $this->logger->info('Received a payload from client {clientId}', ['clientId' => $clientId]); - - $message = $this->serializer->deserialize($clientMessage); - - $callback($message, $clientId); - } - } - - public function host(): string - { - return $this->host; - } - - /** @return resource */ - private function socket() - { - $this->start(); - - $socket = $this->socket; - - if (!$socket) { - throw new RuntimeException(); - } - - return $socket; - } - - /** - * @param resource $socket - * - * @return iterable - */ - private function messages($socket): iterable - { - $sockets = [(int)$socket => $socket]; - $write = []; - - while (true) { - $read = $sockets; - stream_select($read, $write, $write, null); - - foreach ($read as $stream) { - if ($socket === $stream) { - $stream = stream_socket_accept($socket); - - if ($stream === false) { - continue; - } - - $sockets[(int)$stream] = $stream; - } elseif (feof($stream)) { - unset($sockets[(int)$stream]); - fclose($stream); - } else { - $content = fgets($stream); - - if ($content === false) { - continue; - } - - yield (int)$stream => $content; - } - } - } - } -} diff --git a/src/WatchServer/SocketWatchServerClient.php b/src/WatchServer/SocketWatchServerClient.php deleted file mode 100644 index 6ea61f55e..000000000 --- a/src/WatchServer/SocketWatchServerClient.php +++ /dev/null @@ -1,128 +0,0 @@ -host = $host; - $this->socket = null; - } - - public function send(Message $message): void - { - $socket = $this->createSocket(); - - if (!$socket) { - throw new SendingFailed('socket connection could not be established'); - } - - $encodedPayload = $this->serializer->serialize($message) . "\n"; - - set_error_handler([self::class, 'nullErrorHandler']); - - try { - if (@stream_socket_sendto($socket, $encodedPayload) !== -1) { - return; - } - - $this->closeSocket(); - $socket = $this->createSocket(); - - if (!$socket) { - throw new SendingFailed('socket connection could not be established'); - } - - if (@stream_socket_sendto($socket, $encodedPayload) !== -1) { - return; - } - } finally { - restore_error_handler(); - } - - throw new SendingFailed('unknown error'); - } - - /** @return resource|null */ - private function createSocket() - { - if ($this->socket) { - return $this->socket; - } - - set_error_handler([self::class, 'nullErrorHandler']); - - try { - $socket = @stream_socket_client( - $this->host, - $errno, - $errstr, - 3, - STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT, - ); - - if (!$socket) { - return null; - } - - $this->socket = $socket; - - return $socket; - } finally { - restore_error_handler(); - } - } - - private function closeSocket(): void - { - $socket = $this->socket; - - if (!$socket) { - return; - } - - if (!@stream_socket_shutdown($socket, STREAM_SHUT_RDWR)) { - throw new SendingFailed('socket shutdown failed'); - } - - fclose($socket); - - $this->socket = null; - } - - /** @internal */ - public function nullErrorHandler(int $errno, string $errstr): bool - { - return false; - } -} diff --git a/src/WatchServer/WatchEventBus.php b/src/WatchServer/WatchEventBus.php deleted file mode 100644 index 82a83d7c8..000000000 --- a/src/WatchServer/WatchEventBus.php +++ /dev/null @@ -1,27 +0,0 @@ -watchServerClient->send($message); - } - } catch (SendingFailed) { - // to nothing - } - } -} diff --git a/src/WatchServer/WatchServer.php b/src/WatchServer/WatchServer.php deleted file mode 100644 index 890c2f896..000000000 --- a/src/WatchServer/WatchServer.php +++ /dev/null @@ -1,18 +0,0 @@ -prophesize(WatchServer::class); - - $watchServer->start()->shouldBeCalledOnce(); - $watchServer->host()->willReturn('tcp://foo.bar'); - $watchServer->listen(Argument::any())->shouldBeCalled(); - - $serializer = $this->prophesize(EventSerializer::class); - - $command = new WatchCommand( - $watchServer->reveal(), - $serializer->reveal(), - ); - - $input = new ArrayInput([]); - $output = new BufferedOutput(); - - $exitCode = $command->run($input, $output); - - self::assertSame(0, $exitCode); - - $content = $output->fetch(); - - self::assertStringContainsString('Server listening on tcp://foo.bar', $content); - } -} diff --git a/tests/Unit/WatchServer/WatchEventBusTest.php b/tests/Unit/WatchServer/WatchEventBusTest.php deleted file mode 100644 index 93f7ae786..000000000 --- a/tests/Unit/WatchServer/WatchEventBusTest.php +++ /dev/null @@ -1,42 +0,0 @@ -prophesize(WatchServerClient::class); - $client->send($message)->shouldBeCalled(); - - $bus = new WatchEventBus($client->reveal()); - $bus->dispatch($message); - } - - public function testIgnoreErrors(): void - { - $message = new Message(new ProfileVisited(ProfileId::fromString('1'))); - - $client = $this->prophesize(WatchServerClient::class); - $client->send($message)->shouldBeCalled()->willThrow(SendingFailed::class); - - $bus = new WatchEventBus($client->reveal()); - $bus->dispatch($message); - } -}