Skip to content

Commit

Permalink
remove watch server & use event store for watching
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 23, 2024
1 parent 771a7ce commit 1525902
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 505 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve
* Versioned and managed lifecycle of [projections](https://patchlevel.github.io/event-sourcing-docs/latest/projection/)
* Smooth [upcasting](https://patchlevel.github.io/event-sourcing-docs/latest/upcasting/) of old events
* Simple setup with [scheme management](https://patchlevel.github.io/event-sourcing-docs/latest/store/) and [doctrine migration](https://patchlevel.github.io/event-sourcing-docs/latest/migration/)
* Dev [tools](https://patchlevel.github.io/event-sourcing-docs/latest/watch_server/) like realtime event watcher
* Built in [cli commands](https://patchlevel.github.io/event-sourcing-docs/latest/cli/) with [symfony](https://symfony.com/)
* and much more...

Expand Down
2 changes: 0 additions & 2 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,4 @@ nav:
- Split Stream: split_stream.md
- Time / Clock: clock.md
- Testing: testing.md
- Other / Tools:
- CLI: cli.md
- Watch Server: watch_server.md
8 changes: 8 additions & 0 deletions docs/pages/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ Interacting with the outbox store is also possible via the cli.

You can find out more about outbox [here](outbox.md).

## Inspector commands

The inspector is a tool to inspect the event streams.

* ShowCommand: `event-sourcing:show`
* ShowAggregateCommand: `event-sourcing:show-aggregate`
* WatchCommand: `event-sourcing:watch`

## CLI example

A cli php file can look like this:
Expand Down
1 change: 0 additions & 1 deletion docs/pages/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve
* Versioned and managed lifecycle of [projections](projection.md)
* Smooth [upcasting](upcasting.md) of old events
* Simple setup with [scheme management](store.md) and [doctrine migration](migration.md)
* Dev [tools](watch_server.md) like realtime event watcher
* Built in [cli commands](cli.md) with [symfony](https://symfony.com/)
* and much more...

Expand Down
75 changes: 0 additions & 75 deletions docs/pages/watch_server.md

This file was deleted.

79 changes: 68 additions & 11 deletions src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,102 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\WatchServer\WatchServer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

use function sprintf;

#[AsCommand(
'event-sourcing:watch',
'live stream of all aggregate events',
)]
final class WatchCommand extends Command
{
public function __construct(

Check failure on line 25 in src/Console/Command/WatchCommand.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

The parameter $server of Patchlevel\EventSourcing\Console\Command\WatchCommand#__construct() changed from Patchlevel\EventSourcing\WatchServer\WatchServer to a non-contravariant Patchlevel\EventSourcing\Store\Store
private readonly WatchServer $server,
private readonly Store $store,
private readonly EventSerializer $serializer,
) {
parent::__construct();
}

protected function configure(): void
{
$this
->addOption(
'sleep',
null,
InputOption::VALUE_REQUIRED,
'How much time should elapse before the next job is executed in milliseconds',
1000,
)
->addOption(
'aggregate',
null,
InputOption::VALUE_REQUIRED,
'filter aggregate name',
)
->addOption(
'aggregate-id',
null,
InputOption::VALUE_REQUIRED,
'filter aggregate id',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$console = new OutputStyle($input, $output);

$this->server->start();
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$aggregate = InputHelper::nullableString($input->getOption('aggregate'));
$aggregateId = InputHelper::nullableString($input->getOption('aggregate-id'));

$console->success(sprintf('Server listening on %s', $this->server->host()));
$console->comment('Quit the server with CONTROL-C.');
$index = $this->currentIndex();

$this->server->listen(
function (Message $message) use ($console): void {
$console->message($this->serializer, $message);
$worker = DefaultWorker::create(
function () use ($console, &$index, $aggregate, $aggregateId): void {
$stream = $this->store->load(
new Criteria(
$aggregate,
$aggregateId,
$index,
),
);

foreach ($stream as $message) {
$console->message($this->serializer, $message);
}

$index = $stream->index();

$stream->close();
},
[],
);

$worker->run($sleep);

return 0;
}

private function currentIndex(): int
{
$stream = $this->store->load(
limit: 1,
backwards: true,
);

$index = $stream->index() ?? 0;

$stream->close();

return $index;
}
}
11 changes: 0 additions & 11 deletions src/WatchServer/SendingFailed.php

This file was deleted.

130 changes: 0 additions & 130 deletions src/WatchServer/SocketWatchServer.php

This file was deleted.

Loading

0 comments on commit 1525902

Please sign in to comment.