diff --git a/baseline.xml b/baseline.xml
index e107ec3e1..617141b91 100644
--- a/baseline.xml
+++ b/baseline.xml
@@ -20,11 +20,6 @@
getName()]]>
-
-
-
-
-
@@ -44,29 +39,6 @@
]]>
-
-
-
-
-
-
-
-
- errorContext]]>
- errorContext]]>
-
-
-
-
-
-
-
-
-
- projector->$method(...)]]>
-
-
-
@@ -93,21 +65,35 @@
]]>
+
+
+
+
+
+
+
+
+ errorContext]]>
+ errorContext]]>
+
+
+
+
+
+
+
+
+
+ subscriber->$method(...)]]>
+
+
+
-
-
-
-
-
-
-
-
-
@@ -137,6 +123,15 @@
+
+
+
+
+
+
+
+
+
@@ -150,12 +145,6 @@
-
-
-
-
-
-
@@ -163,13 +152,13 @@
-
+
-
+
@@ -247,7 +236,7 @@
-
+
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index 20e39c784..ebe9a619a 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -83,8 +83,7 @@ nav:
- Repository: repository.md
- Store: store.md
- Event Bus: event_bus.md
- - Processor: processor.md
- - Projection: projection.md
+ - Subscription: subscription.md
- Advanced:
- Aggregate ID: aggregate_id.md
- Normalizer: normalizer.md
diff --git a/docs/pages/aggregate.md b/docs/pages/aggregate.md
index ec4a74f45..5b947dd3e 100644
--- a/docs/pages/aggregate.md
+++ b/docs/pages/aggregate.md
@@ -321,7 +321,7 @@ final class Profile extends BasicAggregateRoot
## Suppress missing apply methods
Sometimes you have events that do not change the state of the aggregate itself,
-but are still recorded for the future, to listen on it or to create a projection.
+but are still recorded for the future or to subscribe for processor and projection.
So that you are not forced to write an apply method for it,
you can suppress the missing apply exceptions these events with the `SuppressMissingApply` attribute.
@@ -518,8 +518,8 @@ This is not a problem, as the `apply` methods are always executed immediately.
In the next case we throw an exception if the hotel is already overbooked.
Besides that, we record another event `FullyBooked`, if the hotel is fully booked with the last booking.
-With this event we could [notify](./processor.md) external systems
-or fill a [projection](./projection.md) with fully booked hotels.
+With this event we could [notify](./subscription.md) external systems
+or fill a [projection](./subscription.md) with fully booked hotels.
```php
use Patchlevel\EventSourcing\Aggregate\BasicAggregateRoot;
@@ -660,5 +660,5 @@ $aggregateRegistry = (new AttributeEventRegistryFactory())->create($paths);
* [How to create own aggregate id](aggregate_id.md)
* [How to store and load aggregates](repository.md)
* [How to snapshot aggregates](snapshots.md)
-* [How to create Projections](projection.md)
+* [How to create Projections](subscription.md)
* [How to split streams](split_stream.md)
\ No newline at end of file
diff --git a/docs/pages/cli.md b/docs/pages/cli.md
index 6f017744e..265af177d 100644
--- a/docs/pages/cli.md
+++ b/docs/pages/cli.md
@@ -6,7 +6,7 @@ You can:
* Create and delete `databases`
* Create, update and delete `schemas`
-* Manage `projections`
+* Manage `subscriptions`
## Database commands
@@ -27,22 +27,21 @@ The database schema can also be created, updated and dropped.
You can also register doctrine migration commands.
-## Projection commands
+## Subscription commands
-To manage your projectors there are the following cli commands.
+To manage your subscriptions there are the following cli commands.
-* ProjectionBootCommand: `event-sourcing:projection:boot`
-* ProjectionPauseCommand: `event-sourcing:projection:pause`
-* ProjectionReactiveCommand: `event-sourcing:projection:reactive`
-* ProjectionRebuildCommand: `event-sourcing:projection:rebuild`
-* ProjectionRemoveCommand: `event-sourcing:projection:remove`
-* ProjectionRunCommand: `event-sourcing:projection:run`
-* ProjectionStatusCommand: `event-sourcing:projection:status`
-* ProjectionTeardownCommand: `event-sourcing:projection:teardown`
+* SubscriptionBootCommand: `event-sourcing:subscription:boot`
+* SubscriptionPauseCommand: `event-sourcing:subscription:pause`
+* SubscriptionReactiveCommand: `event-sourcing:subscription:reactive`
+* SubscriptionRemoveCommand: `event-sourcing:subscription:remove`
+* SubscriptionRunCommand: `event-sourcing:subscription:run`
+* SubscriptionStatusCommand: `event-sourcing:subscription:status`
+* SubscriptionTeardownCommand: `event-sourcing:subscription:teardown`
!!! note
- You can find out more about projections [here](projection.md).
+ You can find out more about subscriptions [here](subscription.md).
## Inspector commands
@@ -74,14 +73,14 @@ $schemaManager = new DoctrineSchemaManager();
$cli->addCommands(array(
new Command\DatabaseCreateCommand($store, $doctrineHelper),
new Command\DatabaseDropCommand($store, $doctrineHelper),
- new Command\ProjectionBootCommand($projectionist),
- new Command\ProjectionPauseCommand($projectionist),
- new Command\ProjectionRunCommand($projectionist),
- new Command\ProjectionTeardownCommand($projectionist),
- new Command\ProjectionRemoveCommand($projectionist),
- new Command\ProjectionReactivateCommand($projectionist),
- new Command\ProjectionRebuildCommand($projectionist),
- new Command\ProjectionStatusCommand($projectionist),
+ new Command\SubscriptionBootCommand($projectionist),
+ new Command\SubscriptionPauseCommand($projectionist),
+ new Command\SubscriptionRunCommand($projectionist),
+ new Command\SubscriptionTeardownCommand($projectionist),
+ new Command\SubscriptionRemoveCommand($projectionist),
+ new Command\SubscriptionReactivateCommand($projectionist),
+ new Command\SubscriptionRebuildCommand($projectionist),
+ new Command\SubscriptionStatusCommand($projectionist),
new Command\SchemaCreateCommand($store, $schemaManager),
new Command\SchemaDropCommand($store, $schemaManager),
new Command\SchemaUpdateCommand($store, $schemaManager),
diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md
index b8508edfb..db6268e96 100644
--- a/docs/pages/event_bus.md
+++ b/docs/pages/event_bus.md
@@ -233,6 +233,5 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher);
* [How to decorate messages](message_decorator.md)
* [How to use outbox pattern](outbox.md)
-* [How to use processor](processor.md)
-* [How to use projections](projection.md)
-* [How to debug messages with the watch server](watch_server.md)
+* [How to use processor](subscription.md)
+* [How to use subscriptions](subscription.md)
diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md
index 9b8c53ea1..7c3bf5559 100644
--- a/docs/pages/getting_started.md
+++ b/docs/pages/getting_started.md
@@ -154,22 +154,21 @@ final class Hotel extends BasicAggregateRoot
So that we can see all the hotels on our website and also see how many guests are currently visiting the hotels,
we need a projection for it. To create a projection we need a projector.
-Each projector is then responsible for a specific projection and version.
+Each subscriber is then responsible for a specific projection.
```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\Attribute\Subscribe;
-use Patchlevel\EventSourcing\Attribute\Projector;
+use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\EventBus\Message;
-use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
-use Patchlevel\EventSourcing\Projection\Projector\ProjectorUtil;
+use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
-#[Projector('hotel')]
+#[Subscriber('hotel')]
final class HotelProjector
{
- use ProjectorUtil;
+ use SubscriberUtil;
public function __construct(
private readonly Connection $db
@@ -231,14 +230,14 @@ final class HotelProjector
private function table(): string
{
- return 'projection_' . $this->projectorId();
+ return 'projection_' . $this->subscriberId();
}
}
```
!!! note
- You can find out more about projections [here](projection.md).
+ You can find out more about subscriptions [here](subscription.md).
## Processor
@@ -270,7 +269,7 @@ final class SendCheckInEmailProcessor
!!! note
- You can find out more about processor [here](processor.md).
+ You can find out more about processor [here](subscription.md).
## Configuration
@@ -279,9 +278,9 @@ After we have defined everything, we still have to plug the whole thing together
```php
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
-use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
-use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
-use Patchlevel\EventSourcing\Projection\Projector\MetadataProjectorAccessorRepository;
+use Patchlevel\EventSourcing\Projection\Engine\DefaultSubscriptionEngine;
+use Patchlevel\EventSourcing\Projection\Subscriber\MetadataSubscriberAccessorRepository;
+use Patchlevel\EventSourcing\Projection\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
@@ -307,13 +306,13 @@ $eventStore = new DoctrineDbalStore(
$hotelProjector = new HotelProjector($projectionConnection);
-$projectorRepository = new MetadataProjectorAccessorRepository([
+$projectorRepository = new MetadataSubscriberAccessorRepository([
$hotelProjector,
]);
-$projectionStore = new DoctrineStore($connection);
+$projectionStore = new DoctrineSubscriptionStore($connection);
-$projectionist = new DefaultProjectionist(
+$projectionist = new DefaultSubscriptionEngine(
$eventStore,
$projectionStore,
$projectorRepository,
@@ -407,6 +406,6 @@ $hotels = $hotelProjection->getHotels();
* [How to create an aggregate](aggregate.md)
* [How to create an event](events.md)
* [How to store aggregates](repository.md)
-* [How to process events](processor.md)
-* [How to create a projection](projection.md)
+* [How to process events](subscription.md)
+* [How to create a projection](subscription.md)
* [How to setup the database](store.md)
\ No newline at end of file
diff --git a/docs/pages/index.md b/docs/pages/index.md
index a0c68346b..c945298a1 100644
--- a/docs/pages/index.md
+++ b/docs/pages/index.md
@@ -10,7 +10,7 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve
* Automatic [snapshot](snapshots.md)-system to boost your performance
* [Split](split_stream.md) big aggregates into multiple streams
* Build-in [pipeline](pipeline.md) to export, import and migrate event streams
-* Versioned and managed lifecycle of [projections](projection.md)
+* Versioned and managed lifecycle of [subscriptions](subscription.md) like projections and processors
* Smooth [upcasting](upcasting.md) of old events
* Simple setup with [scheme management](store.md) and [doctrine migration](migration.md)
* Built in [cli commands](cli.md) with [symfony](https://symfony.com/)
diff --git a/docs/pages/processor.md b/docs/pages/processor.md
deleted file mode 100644
index 1dec6c5e0..000000000
--- a/docs/pages/processor.md
+++ /dev/null
@@ -1,38 +0,0 @@
-# Processor
-
-The `processor` is a kind of [event bus](./event_bus.md) listener that can execute actions on certain events.
-A process can be for example used to send an email when a profile has been created:
-
-## Listener
-
-Here is an example with a listener.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Subscribe;
-use Patchlevel\EventSourcing\EventBus\Listener;
-use Patchlevel\EventSourcing\EventBus\Message;
-
-final class SendEmailProcessor
-{
- public function __construct(
- private readonly Mailer $mailer
- ) {
- }
-
- #[Subscribe(ProfileCreated::class)]
- public function __invoke(Message $message): void
- {
- $event = $message->event();
-
- $this->mailer->send(
- $event->email,
- 'Profile created',
- '...'
- );
- }
-}
-```
-
-!!! tip
-
- You can find out more about the event bus [here](event_bus.md).
diff --git a/docs/pages/projection.md b/docs/pages/projection.md
deleted file mode 100644
index a57b3246d..000000000
--- a/docs/pages/projection.md
+++ /dev/null
@@ -1,591 +0,0 @@
-# Projections
-
-With `projections` you can transform your data optimized for reading.
-projections can be adjusted, deleted or rebuilt at any time.
-This is possible because the event store remains untouched
-and everything can always be reproduced from the events.
-
-A projection can be anything.
-Either a file, a relational database, a no-sql database like mongodb or an elasticsearch.
-
-## Projector
-
-To create a projection you need a projector with a unique ID named `projectorId`.
-This projector is responsible for a specific projection.
-To do this, you can use the `Projector` attribute.
-
-```php
-use Doctrine\DBAL\Connection;
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\Projection\Projector\ProjectorUtil;
-
-#[Projector('profile_1')]
-final class ProfileProjector
-{
- use ProjectorUtil;
-
- public function __construct(
- private readonly Connection $connection
- ) {
- }
-}
-```
-
-!!! tip
-
- Add a version as suffix to the `projectorId`,
- so you can increment it when the projection changes.
- Like `profile_1` to `profile_2`.
-
-!!! warning
-
- MySQL and MariaDB don't support transactions for DDL statements.
- So you must use a different database connection for your projections.
-
-### Subscribe
-
-A projector can subscribe any number of events.
-In order to say which method is responsible for which event, you need the `Subscribe` attribute.
-There you can pass the event class to which the reaction should then take place.
-The method itself must expect a `Message`, which then contains the event.
-The method name itself doesn't matter.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Subscribe;
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\EventBus\Message;
-use Patchlevel\EventSourcing\Projection\Projector\ProjectorUtil;
-
-#[Projector('profile_1')]
-final class ProfileProjector
-{
- use ProjectorUtil;
-
- // ...
-
- #[Subscribe(ProfileCreated::class)]
- public function handleProfileCreated(Message $message): void
- {
- $profileCreated = $message->event();
-
- $this->connection->executeStatement(
- "INSERT INTO {$this->table()} (id, name) VALUES(?, ?);",
- [
- 'id' => $profileCreated->profileId->toString(),
- 'name' => $profileCreated->name
- ]
- );
- }
-
- private function table(): string
- {
- return 'projection_' . $this->projectionId();
- }
-}
-```
-
-!!! warning
-
- You have to be careful with actions because in default it will be executed from the start of the event stream.
- Even if you change the ProjectionId, it will run again from the start.
-
-!!! note
-
- You can subscribe to multiple events on the same method or you can use "*" to subscribe to all events.
- More about this can be found [here](./event_bus.md#listener).
-
-!!! tip
-
- If you are using psalm then you can install the event sourcing [plugin](https://github.com/patchlevel/event-sourcing-psalm-plugin)
- to make the event method return the correct type.
-
-### Setup and Teardown
-
-Projectors can have one `setup` and `teardown` method that is executed when the projection is created or deleted.
-For this there are the attributes `Setup` and `Teardown`. The method name itself doesn't matter.
-In some cases it may be that no schema has to be created for the projection,
-as the target does it automatically, so you can skip this.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Setup;
-use Patchlevel\EventSourcing\Attribute\Teardown;
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\Projection\Projector\ProjectorUtil;
-
-#[Projector('profile_1')]
-final class ProfileProjector
-{
- use ProjectorUtil;
-
- // ...
-
- #[Setup]
- public function create(): void
- {
- $this->connection->executeStatement(
- "CREATE TABLE IF NOT EXISTS {$this->table()} (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);"
- );
- }
-
- #[Teardown]
- public function drop(): void
- {
- $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->table()};");
- }
-
- private function table(): string
- {
- return 'projection_' . $this->projectionId();
- }
-}
-```
-
-!!! warning
-
- If you change the `projectorID`, you must also change the table/collection name.
- Otherwise the table/collection will conflict with the old projection.
-
-!!! note
-
- Most databases have a limit on the length of the table/collection name.
- The limit is usually 64 characters.
-
-!!! tip
-
- You can also use the `ProjectorUtil` to build the table/collection name.
-
-### Read Model
-
-You can also implement your read model here.
-You can offer methods that then read the data and put it into a specific format.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\Projection\Projector\ProjectorUtil;
-
-#[Projector('profile_1')]
-final class ProfileProjector
-{
- use ProjectorUtil;
-
- // ...
-
- /**
- * @return list
- */
- public function getProfiles(): array
- {
- return $this->connection->fetchAllAssociative("SELECT id, name FROM {$this->table()};");
- }
-
- private function table(): string
- {
- return 'projection_' . $this->projectionId();
- }
-}
-```
-
-!!! tip
-
- You can also use the `ProjectorUtil` to build the table/collection name.
-
-### Versioning
-
-As soon as the structure of a projection changes, or you need other events from the past,
-the `projectorId` must be change or increment.
-
-Otherwise, the projectionist will not recognize that the projection has changed and will not rebuild it.
-To do this, you can add a version to the `projectorId`:
-
-```php
-use Patchlevel\EventSourcing\Attribute\Projector;
-
-#[Projector('profile_2')]
-final class ProfileProjector
-{
- // ...
-}
-```
-
-!!! warning
-
- If you change the `projectorID`, you must also change the table/collection name.
- Otherwise the table/collection will conflict with the old projection.
-
-### Grouping
-
-You can also group projectors and address these to the projectionist.
-This is useful if you want to run projectors in different processes or on different servers.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Projector;
-
-#[Projector('profile_1', group: 'a')]
-final class ProfileProjector
-{
- // ...
-}
-```
-
-!!! note
-
- The default group is `default` and the projectionist takes all groups if none are given to him.
-
-### Run Mode
-
-The run mode determines how the projector should behave when it is booted.
-There are three different modes:
-
-#### From Beginning
-
-This is the default mode.
-The projector will start from the beginning of the event stream and process all events.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\Projection\Projection\RunMode;
-
-#[Projector('welcome_email', runMode: RunMode::FromBeginning)]
-final class WelcomeEmailProjector
-{
- // ...
-}
-```
-
-#### From Now
-
-Certain projectors operate exclusively on post-release events, disregarding historical data.
-This is useful for projectors that are only interested in events that occur after a certain point in time.
-As example, a welcome email projector that only wants to send emails to new users.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\Projection\Projection\RunMode;
-
-#[Projector('welcome_email', runMode: RunMode::FromNow)]
-final class WelcomeEmailProjector
-{
- // ...
-}
-```
-
-#### Once
-
-This mode is useful for projectors that only need to run once.
-This is useful for projectors to create reports or to migrate data.
-
-```php
-use Patchlevel\EventSourcing\Attribute\Projector;
-use Patchlevel\EventSourcing\Projection\Projection\RunMode;
-
-#[Projector('migration', runMode: RunMode::Once)]
-final class MigrationProjector
-{
- // ...
-}
-```
-
-## Projectionist
-
-The projectionist manages individual projectors and keeps the projections running.
-Internally, the projectionist does this by tracking where each projector is in the event stream
-and keeping all projections up to date.
-He also takes care that new projectors are booted and old ones are removed again.
-If something breaks, the projectionist marks the individual projections as faulty.
-
-!!! tip
-
- The Projectionist was inspired by the following two blog posts:
-
- * [Projection Building Blocks: What you'll need to build projections](https://barryosull.com/blog/projection-building-blocks-what-you-ll-need-to-build-projections/)
- * [Managing projectors is harder than you think](https://barryosull.com/blog/managing-projectors-is-harder-than-you-think/)
-
-## Projection ID
-
-The projection ID is taken from the associated projector and corresponds to the projector ID.
-Unlike the projector ID, the projection ID can no longer change.
-If the Projector ID is changed, a new projection will be created with this new projector ID.
-So there are two projections, one with the old projector ID and one with the new projector ID.
-
-## Projection Position
-
-Furthermore, the position in the event stream is stored for each projection.
-So that the projectionist knows where the projection stopped and must continue.
-
-## Projection Status
-
-There is a lifecycle for each projection.
-This cycle is tracked by the projectionist.
-
-``` mermaid
-stateDiagram-v2
- direction LR
- [*] --> New
- New --> Booting
- New --> Error
- Booting --> Active
- Booting --> Paused
- Booting --> Finished
- Booting --> Error
- Active --> Paused
- Active --> Finished
- Active --> Outdated
- Active --> Error
- Paused --> New
- Paused --> Booting
- Paused --> Active
- Paused --> Outdated
- Paused --> [*]
- Finished --> Active
- Finished --> Outdated
- Error --> New
- Error --> Booting
- Error --> Active
- Error --> Paused
- Error --> [*]
- Outdated --> Active
- Outdated --> [*]
-```
-
-### New
-
-A projection is created and "new" if a projector exists with an ID that is not yet tracked.
-This can happen when either a new projector has been added, the `projector id` has changed
-or the projection has been manually deleted from the projection store.
-
-### Booting
-
-Booting status is reached when the boot process is invoked.
-In this step, the "setup" method is called on the projection, if available.
-And the projection is brought up to date, depending on the mode.
-When the process is finished, the projection is set to active.
-
-### Active
-
-The active status describes the projections currently being actively managed by the projectionist.
-These projections have a projector, follow the event stream and should be up-to-date.
-
-## Paused
-
-A projection can manually be paused. It will then no longer be updated by the projectionist.
-This can be useful if you want to pause a projection for a certain period of time.
-You can also reactivate the projection if you want so that it continues.
-
-### Finished
-
-A projection is finished if the projector has the mode `RunMode::Once`.
-This means that the projection is only run once and then set to finished if it reaches the end of the event stream.
-You can also reactivate the projection if you want so that it continues.
-
-### Outdated
-
-If an active or finished projection exists in the projection store
-that does not have a projector in the source code with a corresponding projector ID,
-then this projection is marked as outdated.
-This happens when either the projector has been deleted
-or the projector ID of a projector has changed.
-In the last case there should be a new projection with the new projector ID.
-
-An outdated projection does not automatically become active again when the projector exists again.
-This happens, for example, when an old version was deployed again during a rollback.
-
-There are two options to reactivate the projection:
-
-* Reactivate the projection, so that the projection is active again.
-* Remove the projection and rebuild it from scratch.
-
-### Error
-
-If an error occurs in a projector, then the target projection is set to Error.
-This can happen in the create process, in the boot process or in the run process.
-This projection will then no longer boot/run until the projection is reactivate or retried.
-
-The projectionist has a retry strategy to retry projections that have failed.
-It tries to reactivate the projection after a certain time and a certain number of attempts.
-If this does not work, the projection is set to error and must be manually reactivated.
-
-There are two options here:
-
-* Reactivate the projection, so that the projection is in the previous state again.
-* Remove the projection and rebuild it from scratch.
-
-## Setup
-
-In order for the projectionist to be able to do its work, you have to assemble it beforehand.
-
-### Projection Store
-
-The Projectionist uses a projection store to store the status of each projection.
-We provide a Doctrine implementation of this by default.
-
-```php
-use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
-
-$projectionStore = new DoctrineStore($connection);
-```
-
-So that the schema for the projection store can also be created,
-we have to tell the `SchemaDirector` our schema configuration.
-Using `ChainSchemaConfigurator` we can add multiple schema configurators.
-In our case they need the `SchemaConfigurator` from the event store and projection store.
-
-```php
-use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
-use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
-
-$schemaDirector = new DoctrineSchemaDirector(
- $connection
- new ChainDoctrineSchemaConfigurator([
- $eventStore,
- $projectionStore
- ]),
-);
-```
-
-!!! note
-
- You can find more about schema configurator [here](./store.md)
-
-### Retry Strategy
-
-The projectionist uses a retry strategy to retry projections that have failed.
-Our default strategy can be configured with the following parameters:
-
-* `baseDelay` - The base delay in seconds.
-* `delayFactor` - The factor by which the delay is multiplied after each attempt.
-* `maxAttempts` - The maximum number of attempts.
-
-```php
-use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy;
-
-$retryStrategy = new ClockBasedRetryStrategy(
- baseDelay: 5,
- delayFactor: 2,
- maxAttempts: 5,
-);
-```
-
-!!! tip
-
- You can reactivate the projection manually or remove it and rebuild it from scratch.
-
-### Projector Accessor
-
-The projector accessor is responsible for providing the projectors to the projectionist.
-We provide a metadata projector accessor repository by default.
-
-```php
-use Patchlevel\EventSourcing\Projection\Projector\MetadataProjectorAccessorRepository;
-
-$projectorAccessorRepository = new MetadataProjectorAccessorRepository([$projector1, $projector2, $projector3]);
-```
-
-### Projectionist
-
-Now we can create the projectionist and plug together the necessary services.
-The event store is needed to load the events, the Projection Store to store the projection state
-and the respective projectors. Optionally, we can also pass a retry strategy.
-
-```php
-use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
-
-$projectionist = new DefaultProjectionist(
- $eventStore,
- $projectionStore,
- $projectorAccessorRepository,
- $retryStrategy,
-);
-```
-
-## Usage
-
-The Projectionist has a few methods needed to use it effectively.
-A `ProjectionistCriteria` can be passed to all of these methods to filter the respective projectors.
-
-```php
-use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistCriteria;
-
-$criteria = new ProjectionistCriteria(
- ids: ['profile_1', 'welcome_email'],
- groups: ['default']
-);
-```
-
-!!! note
-
- An `OR` check is made for the respective criteria and all criteria are checked with an `AND`.
-
-### Boot
-
-So that the projectionist can manage the projections, they must be booted.
-In this step, the structures are created for all new projections.
-The projections then catch up with the current position of the event stream.
-When the projections are finished, they switch to the active state.
-
-```php
-$projectionist->boot($criteria);
-```
-
-### Run
-
-All active projections are continued and updated here.
-
-```php
-$projectionist->run($criteria);
-```
-
-### Teardown
-
-If projections are outdated, they can be cleaned up here.
-The projectionist also tries to remove the structures created for the projection.
-
-```php
-$projectionist->teardown($criteria);
-```
-
-### Remove
-
-You can also directly remove a projection regardless of its status.
-An attempt is made to remove the structures, but the entry will still be removed if it doesn't work.
-
-```php
-$projectionist->remove($criteria);
-```
-
-### Reactivate
-
-If a projection had an error, you can reactivate it.
-As a result, the projection gets the status active again and is then kept up-to-date again by the projectionist.
-
-```php
-$projectionist->reactivate($criteria);
-```
-
-### Pause
-
-Pausing a projection is also possible.
-The projection will then no longer be updated by the projectionist.
-You can reactivate the projection if you want so that it continues.
-
-```php
-$projectionist->pause($criteria);
-```
-
-### Status
-
-To get the current status of all projections, you can get them using the `projections` method.
-
-```php
-$projections = $projectionist->projections($criteria);
-
-foreach ($projections as $projection) {
- echo $projection->status();
-}
-```
-
-## Learn more
-
-* [How to use CLI commands](./cli.md)
-* [How to use Pipeline](./pipeline.md)
-* [How to use Event Bus](./event_bus.md)
-* [How to Test](./testing.md)
\ No newline at end of file
diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md
new file mode 100644
index 000000000..3c76feffb
--- /dev/null
+++ b/docs/pages/subscription.md
@@ -0,0 +1,591 @@
+# Subscriptions
+
+With `subscriptions` you can transform your data optimized for reading.
+Subscriptions can be adjusted, deleted or rebuilt at any time.
+This is possible because the event store remains untouched
+and everything can always be reproduced from the events.
+
+A subscription can be anything.
+Either a file, a relational database, a no-sql database like mongodb or an elasticsearch.
+
+## Subscriber
+
+To create a subscription you need a subscriber with a unique ID named `subscriberId`.
+This subscriber is responsible for a specific subscription.
+To do this, you can use the `Subscriber` attribute.
+
+```php
+use Doctrine\DBAL\Connection;
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
+
+#[Subscriber('profile_1')]
+final class ProfileSubscriber
+{
+ use SubscriberUtil;
+
+ public function __construct(
+ private readonly Connection $connection
+ ) {
+ }
+}
+```
+
+!!! tip
+
+ Add a version as suffix to the `subscriberId`,
+ so you can increment it when the subscription changes.
+ Like `profile_1` to `profile_2`.
+
+!!! warning
+
+ MySQL and MariaDB don't support transactions for DDL statements.
+ So you must use a different database connection for your subscriptions.
+
+### Subscribe
+
+A subscriber can subscribe any number of events.
+In order to say which method is responsible for which event, you need the `Subscribe` attribute.
+There you can pass the event class to which the reaction should then take place.
+The method itself must expect a `Message`, which then contains the event.
+The method name itself doesn't matter.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscribe;
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\EventBus\Message;
+use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
+
+#[Subscriber('profile_1')]
+final class ProfileSubscriber
+{
+ use SubscriberUtil;
+
+ // ...
+
+ #[Subscribe(ProfileCreated::class)]
+ public function handleProfileCreated(Message $message): void
+ {
+ $profileCreated = $message->event();
+
+ $this->connection->executeStatement(
+ "INSERT INTO {$this->table()} (id, name) VALUES(?, ?);",
+ [
+ 'id' => $profileCreated->profileId->toString(),
+ 'name' => $profileCreated->name
+ ]
+ );
+ }
+
+ private function table(): string
+ {
+ return 'subscription_' . $this->subscriptionId();
+ }
+}
+```
+
+!!! warning
+
+ You have to be careful with actions because in default it will be executed from the start of the event stream.
+ Even if you change the SubscriptionId, it will run again from the start.
+
+!!! note
+
+ You can subscribe to multiple events on the same method or you can use "*" to subscribe to all events.
+ More about this can be found [here](./event_bus.md#listener).
+
+!!! tip
+
+ If you are using psalm then you can install the event sourcing [plugin](https://github.com/patchlevel/event-sourcing-psalm-plugin)
+ to make the event method return the correct type.
+
+### Setup and Teardown
+
+Subscribers can have one `setup` and `teardown` method that is executed when the subscription is created or deleted.
+For this there are the attributes `Setup` and `Teardown`. The method name itself doesn't matter.
+In some cases it may be that no schema has to be created for the subscription,
+as the target does it automatically, so you can skip this.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Setup;
+use Patchlevel\EventSourcing\Attribute\Teardown;
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
+
+#[Subscriber('profile_1')]
+final class ProfileSubscriber
+{
+ use SubscriberUtil;
+
+ // ...
+
+ #[Setup]
+ public function create(): void
+ {
+ $this->connection->executeStatement(
+ "CREATE TABLE IF NOT EXISTS {$this->table()} (id VARCHAR PRIMARY KEY, name VARCHAR NOT NULL);"
+ );
+ }
+
+ #[Teardown]
+ public function drop(): void
+ {
+ $this->connection->executeStatement("DROP TABLE IF EXISTS {$this->table()};");
+ }
+
+ private function table(): string
+ {
+ return 'subscription_' . $this->subscriptionId();
+ }
+}
+```
+
+!!! warning
+
+ If you change the `subscriberID`, you must also change the table/collection name.
+ Otherwise the table/collection will conflict with the old subscription.
+
+!!! note
+
+ Most databases have a limit on the length of the table/collection name.
+ The limit is usually 64 characters.
+
+!!! tip
+
+ You can also use the `SubscriberUtil` to build the table/collection name.
+
+### Read Model
+
+You can also implement your read model here.
+You can offer methods that then read the data and put it into a specific format.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
+
+#[Subscriber('profile_1')]
+final class ProfileSubscriber
+{
+ use SubscriberUtil;
+
+ // ...
+
+ /**
+ * @return list
+ */
+ public function getProfiles(): array
+ {
+ return $this->connection->fetchAllAssociative("SELECT id, name FROM {$this->table()};");
+ }
+
+ private function table(): string
+ {
+ return 'subscription_' . $this->subscriptionId();
+ }
+}
+```
+
+!!! tip
+
+ You can also use the `SubscriberUtil` to build the table/collection name.
+
+### Versioning
+
+As soon as the structure of a subscription changes, or you need other events from the past,
+the `subscriberId` must be change or increment.
+
+Otherwise, the subscription engine will not recognize that the subscription has changed and will not rebuild it.
+To do this, you can add a version to the `subscriberId`:
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+
+#[Subscriber('profile_2')]
+final class ProfileSubscriber
+{
+ // ...
+}
+```
+
+!!! warning
+
+ If you change the `subscriberID`, you must also change the table/collection name.
+ Otherwise the table/collection will conflict with the old subscription.
+
+### Grouping
+
+You can also group subscribers and address these to the subscription engine.
+This is useful if you want to run subscribers in different processes or on different servers.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+
+#[Subscriber('profile_1', group: 'a')]
+final class ProfileSubscriber
+{
+ // ...
+}
+```
+
+!!! note
+
+ The default group is `default` and the subscription engine takes all groups if none are given to him.
+
+### Run Mode
+
+The run mode determines how the subscriber should behave when it is booted.
+There are three different modes:
+
+#### From Beginning
+
+This is the default mode.
+The subscriber will start from the beginning of the event stream and process all events.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\Subscription\Subscription\RunMode;
+
+#[Subscriber('welcome_email', runMode: RunMode::FromBeginning)]
+final class WelcomeEmailSubscriber
+{
+ // ...
+}
+```
+
+#### From Now
+
+Certain subscribers operate exclusively on post-release events, disregarding historical data.
+This is useful for subscribers that are only interested in events that occur after a certain point in time.
+As example, a welcome email subscriber that only wants to send emails to new users.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\Subscription\Subscription\RunMode;
+
+#[Subscriber('welcome_email', runMode: RunMode::FromNow)]
+final class WelcomeEmailSubscriber
+{
+ // ...
+}
+```
+
+#### Once
+
+This mode is useful for subscribers that only need to run once.
+This is useful for subscribers to create reports or to migrate data.
+
+```php
+use Patchlevel\EventSourcing\Attribute\Subscriber;
+use Patchlevel\EventSourcing\Subscription\Subscription\RunMode;
+
+#[Subscriber('migration', runMode: RunMode::Once)]
+final class MigrationSubscriber
+{
+ // ...
+}
+```
+
+## Subscription Engine
+
+The subscription engine manages individual subscribers and keeps the subscriptions running.
+Internally, the subscription engine does this by tracking where each subscriber is in the event stream
+and keeping all subscriptions up to date.
+He also takes care that new subscribers are booted and old ones are removed again.
+If something breaks, the subscription engine marks the individual subscriptions as faulty.
+
+!!! tip
+
+ The Subscription Engine was inspired by the following two blog posts:
+
+ * [Projection Building Blocks: What you'll need to build projections](https://barryosull.com/blog/projection-building-blocks-what-you-ll-need-to-build-projections/)
+ * [Managing projectors is harder than you think](https://barryosull.com/blog/managing-projectors-is-harder-than-you-think/)
+
+## Subscription ID
+
+The subscription ID is taken from the associated subscriber and corresponds to the subscriber ID.
+Unlike the subscriber ID, the subscription ID can no longer change.
+If the Subscriber ID is changed, a new subscription will be created with this new subscriber ID.
+So there are two subscriptions, one with the old subscriber ID and one with the new subscriber ID.
+
+## Subscription Position
+
+Furthermore, the position in the event stream is stored for each subscription.
+So that the subscription engine knows where the subscription stopped and must continue.
+
+## Subscription Status
+
+There is a lifecycle for each subscription.
+This cycle is tracked by the subscription engine.
+
+``` mermaid
+stateDiagram-v2
+ direction LR
+ [*] --> New
+ New --> Booting
+ New --> Error
+ Booting --> Active
+ Booting --> Paused
+ Booting --> Finished
+ Booting --> Error
+ Active --> Paused
+ Active --> Finished
+ Active --> Outdated
+ Active --> Error
+ Paused --> New
+ Paused --> Booting
+ Paused --> Active
+ Paused --> Outdated
+ Paused --> [*]
+ Finished --> Active
+ Finished --> Outdated
+ Error --> New
+ Error --> Booting
+ Error --> Active
+ Error --> Paused
+ Error --> [*]
+ Outdated --> Active
+ Outdated --> [*]
+```
+
+### New
+
+A subscription is created and "new" if a subscriber exists with an ID that is not yet tracked.
+This can happen when either a new subscriber has been added, the `subscriber id` has changed
+or the subscription has been manually deleted from the subscription store.
+
+### Booting
+
+Booting status is reached when the boot process is invoked.
+In this step, the "setup" method is called on the subscription, if available.
+And the subscription is brought up to date, depending on the mode.
+When the process is finished, the subscription is set to active.
+
+### Active
+
+The active status describes the subscriptions currently being actively managed by the subscription engine.
+These subscriptions have a subscriber, follow the event stream and should be up-to-date.
+
+## Paused
+
+A subscription can manually be paused. It will then no longer be updated by the subscription engine.
+This can be useful if you want to pause a subscription for a certain period of time.
+You can also reactivate the subscription if you want so that it continues.
+
+### Finished
+
+A subscription is finished if the subscriber has the mode `RunMode::Once`.
+This means that the subscription is only run once and then set to finished if it reaches the end of the event stream.
+You can also reactivate the subscription if you want so that it continues.
+
+### Outdated
+
+If an active or finished subscription exists in the subscription store
+that does not have a subscriber in the source code with a corresponding subscriber ID,
+then this subscription is marked as outdated.
+This happens when either the subscriber has been deleted
+or the subscriber ID of a subscriber has changed.
+In the last case there should be a new subscription with the new subscriber ID.
+
+An outdated subscription does not automatically become active again when the subscriber exists again.
+This happens, for example, when an old version was deployed again during a rollback.
+
+There are two options to reactivate the subscription:
+
+* Reactivate the subscription, so that the subscription is active again.
+* Remove the subscription and rebuild it from scratch.
+
+### Error
+
+If an error occurs in a subscriber, then the target subscription is set to Error.
+This can happen in the create process, in the boot process or in the run process.
+This subscription will then no longer boot/run until the subscription is reactivate or retried.
+
+The subscription engine has a retry strategy to retry subscriptions that have failed.
+It tries to reactivate the subscription after a certain time and a certain number of attempts.
+If this does not work, the subscription is set to error and must be manually reactivated.
+
+There are two options here:
+
+* Reactivate the subscription, so that the subscription is in the previous state again.
+* Remove the subscription and rebuild it from scratch.
+
+## Setup
+
+In order for the subscription engine to be able to do its work, you have to assemble it beforehand.
+
+### Subscription Store
+
+The Subscription Engine uses a subscription store to store the status of each subscription.
+We provide a Doctrine implementation of this by default.
+
+```php
+use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
+
+$subscriptionStore = new DoctrineSubscriptionStore($connection);
+```
+
+So that the schema for the subscription store can also be created,
+we have to tell the `SchemaDirector` our schema configuration.
+Using `ChainSchemaConfigurator` we can add multiple schema configurators.
+In our case they need the `SchemaConfigurator` from the event store and subscription store.
+
+```php
+use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
+use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
+
+$schemaDirector = new DoctrineSchemaDirector(
+ $connection
+ new ChainDoctrineSchemaConfigurator([
+ $eventStore,
+ $subscriptionStore
+ ]),
+);
+```
+
+!!! note
+
+ You can find more about schema configurator [here](./store.md)
+
+### Retry Strategy
+
+The subscription engine uses a retry strategy to retry subscriptions that have failed.
+Our default strategy can be configured with the following parameters:
+
+* `baseDelay` - The base delay in seconds.
+* `delayFactor` - The factor by which the delay is multiplied after each attempt.
+* `maxAttempts` - The maximum number of attempts.
+
+```php
+use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
+
+$retryStrategy = new ClockBasedRetryStrategy(
+ baseDelay: 5,
+ delayFactor: 2,
+ maxAttempts: 5,
+);
+```
+
+!!! tip
+
+ You can reactivate the subscription manually or remove it and rebuild it from scratch.
+
+### Subscriber Accessor
+
+The subscriber accessor is responsible for providing the subscribers to the subscription engine.
+We provide a metadata subscriber accessor repository by default.
+
+```php
+use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
+
+$subscriberAccessorRepository = new MetadataSubscriberAccessorRepository([$subscriber1, $subscriber2, $subscriber3]);
+```
+
+### Subscription Engine
+
+Now we can create the subscription engine and plug together the necessary services.
+The event store is needed to load the events, the Subscription Store to store the subscription state
+and the respective subscribers. Optionally, we can also pass a retry strategy.
+
+```php
+use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
+
+$subscriptionEngine = new DefaultSubscriptionEngine(
+ $eventStore,
+ $subscriptionStore,
+ $subscriberAccessorRepository,
+ $retryStrategy,
+);
+```
+
+## Usage
+
+The Subscription Engine has a few methods needed to use it effectively.
+A `SubscriptionEngineCriteria` can be passed to all of these methods to filter the respective subscribers.
+
+```php
+use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
+
+$criteria = new SubscriptionEngineCriteria(
+ ids: ['profile_1', 'welcome_email'],
+ groups: ['default']
+);
+```
+
+!!! note
+
+ An `OR` check is made for the respective criteria and all criteria are checked with an `AND`.
+
+### Boot
+
+So that the subscription engine can manage the subscriptions, they must be booted.
+In this step, the structures are created for all new subscriptions.
+The subscriptions then catch up with the current position of the event stream.
+When the subscriptions are finished, they switch to the active state.
+
+```php
+$subscriptionEngine->boot($criteria);
+```
+
+### Run
+
+All active subscriptions are continued and updated here.
+
+```php
+$subscriptionEngine->run($criteria);
+```
+
+### Teardown
+
+If subscriptions are outdated, they can be cleaned up here.
+The subscription engine also tries to remove the structures created for the subscription.
+
+```php
+$subscriptionEngine->teardown($criteria);
+```
+
+### Remove
+
+You can also directly remove a subscription regardless of its status.
+An attempt is made to remove the structures, but the entry will still be removed if it doesn't work.
+
+```php
+$subscriptionEngine->remove($criteria);
+```
+
+### Reactivate
+
+If a subscription had an error, you can reactivate it.
+As a result, the subscription gets the status active again and is then kept up-to-date again by the subscription engine.
+
+```php
+$subscriptionEngine->reactivate($criteria);
+```
+
+### Pause
+
+Pausing a subscription is also possible.
+The subscription will then no longer be updated by the subscription engine.
+You can reactivate the subscription if you want so that it continues.
+
+```php
+$subscriptionEngine->pause($criteria);
+```
+
+### Status
+
+To get the current status of all subscriptions, you can get them using the `subscriptions` method.
+
+```php
+$subscriptions = $subscriptionEngine->subscriptions($criteria);
+
+foreach ($subscriptions as $subscription) {
+ echo $subscription->status();
+}
+```
+
+## Learn more
+
+* [How to use CLI commands](./cli.md)
+* [How to use Pipeline](./pipeline.md)
+* [How to use Event Bus](./event_bus.md)
+* [How to Test](./testing.md)
\ No newline at end of file
diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon
index 11c8e8a71..6c9db5d8a 100644
--- a/phpstan-baseline.neon
+++ b/phpstan-baseline.neon
@@ -5,11 +5,6 @@ parameters:
count: 1
path: src/Console/DoctrineHelper.php
- -
- message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Projection\\\\Projection\\\\ProjectionError constructor expects array\\\\}\\>\\|null, mixed given\\.$#"
- count: 1
- path: src/Projection/Projection/Store/DoctrineStore.php
-
-
message: "#^Parameter \\#2 \\$data of method Patchlevel\\\\Hydrator\\\\Hydrator\\:\\:hydrate\\(\\) expects array\\, mixed given\\.$#"
count: 1
@@ -29,3 +24,8 @@ parameters:
message: "#^Ternary operator condition is always true\\.$#"
count: 1
path: src/Store/DoctrineDbalStoreStream.php
+
+ -
+ message: "#^Parameter \\#3 \\$errorContext of class Patchlevel\\\\EventSourcing\\\\Subscription\\\\Subscription\\\\SubscriptionError constructor expects array\\\\}\\>\\|null, mixed given\\.$#"
+ count: 1
+ path: src/Subscription/Store/DoctrineSubscriptionStore.php
diff --git a/src/Attribute/Projector.php b/src/Attribute/Subscriber.php
similarity index 56%
rename from src/Attribute/Projector.php
rename to src/Attribute/Subscriber.php
index a45a7586a..abf798e8e 100644
--- a/src/Attribute/Projector.php
+++ b/src/Attribute/Subscriber.php
@@ -5,15 +5,15 @@
namespace Patchlevel\EventSourcing\Attribute;
use Attribute;
-use Patchlevel\EventSourcing\Projection\Projection\Projection;
-use Patchlevel\EventSourcing\Projection\Projection\RunMode;
+use Patchlevel\EventSourcing\Subscription\Subscription\RunMode;
+use Patchlevel\EventSourcing\Subscription\Subscription\Subscription;
#[Attribute(Attribute::TARGET_CLASS)]
-final class Projector
+final class Subscriber
{
public function __construct(
public readonly string $id,
- public readonly string $group = Projection::DEFAULT_GROUP,
+ public readonly string $group = Subscription::DEFAULT_GROUP,
public readonly RunMode $runMode = RunMode::FromBeginning,
) {
}
diff --git a/src/Console/Command/ProjectionRebuildCommand.php b/src/Console/Command/ProjectionRebuildCommand.php
deleted file mode 100644
index 137333d31..000000000
--- a/src/Console/Command/ProjectionRebuildCommand.php
+++ /dev/null
@@ -1,33 +0,0 @@
-projectionCriteria($input);
-
- if (!$io->confirm('do you want to rebuild all projections?', false)) {
- return 1;
- }
-
- $this->projectionist->remove($criteria);
- $this->projectionist->boot($criteria, null);
-
- return 0;
- }
-}
diff --git a/src/Console/Command/ProjectionBootCommand.php b/src/Console/Command/SubscriptionBootCommand.php
similarity index 75%
rename from src/Console/Command/ProjectionBootCommand.php
rename to src/Console/Command/SubscriptionBootCommand.php
index 387224dba..e63f864eb 100644
--- a/src/Console/Command/ProjectionBootCommand.php
+++ b/src/Console/Command/SubscriptionBootCommand.php
@@ -11,10 +11,10 @@
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
- 'event-sourcing:projection:boot',
- 'Prepare new projections and catch up with the event store',
+ 'event-sourcing:subscription:boot',
+ 'Prepare new subscriptions and catch up with the event store',
)]
-final class ProjectionBootCommand extends ProjectionCommand
+final class SubscriptionBootCommand extends SubscriptionCommand
{
public function configure(): void
{
@@ -33,8 +33,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$limit = InputHelper::nullablePositiveInt($input->getOption('limit'));
- $criteria = $this->projectionCriteria($input);
- $this->projectionist->boot($criteria, $limit);
+ $criteria = $this->subscriptionEngineCriteria($input);
+ $this->engine->boot($criteria, $limit);
return 0;
}
diff --git a/src/Console/Command/ProjectionCommand.php b/src/Console/Command/SubscriptionCommand.php
similarity index 65%
rename from src/Console/Command/ProjectionCommand.php
rename to src/Console/Command/SubscriptionCommand.php
index 2314882d1..8c91b0db1 100644
--- a/src/Console/Command/ProjectionCommand.php
+++ b/src/Console/Command/SubscriptionCommand.php
@@ -5,17 +5,17 @@
namespace Patchlevel\EventSourcing\Console\Command;
use Patchlevel\EventSourcing\Console\InputHelper;
-use Patchlevel\EventSourcing\Projection\Projectionist\Projectionist;
-use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistCriteria;
+use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
+use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
/** @interal */
-abstract class ProjectionCommand extends Command
+abstract class SubscriptionCommand extends Command
{
public function __construct(
- protected readonly Projectionist $projectionist,
+ protected readonly SubscriptionEngine $engine,
) {
parent::__construct();
}
@@ -27,19 +27,19 @@ protected function configure(): void
'id',
null,
InputOption::VALUE_IS_ARRAY | InputOption::VALUE_REQUIRED,
- 'filter by projection id',
+ 'filter by subscription id',
)
->addOption(
'group',
null,
InputOption::VALUE_IS_ARRAY | InputOption::VALUE_REQUIRED,
- 'filter by projection group',
+ 'filter by subscription group',
);
}
- protected function projectionCriteria(InputInterface $input): ProjectionistCriteria
+ protected function subscriptionEngineCriteria(InputInterface $input): SubscriptionEngineCriteria
{
- return new ProjectionistCriteria(
+ return new SubscriptionEngineCriteria(
InputHelper::nullableStringList($input->getOption('id')),
InputHelper::nullableStringList($input->getOption('group')),
);
diff --git a/src/Console/Command/ProjectionPauseCommand.php b/src/Console/Command/SubscriptionPauseCommand.php
similarity index 60%
rename from src/Console/Command/ProjectionPauseCommand.php
rename to src/Console/Command/SubscriptionPauseCommand.php
index 7ab4626ba..50aa8a8f8 100644
--- a/src/Console/Command/ProjectionPauseCommand.php
+++ b/src/Console/Command/SubscriptionPauseCommand.php
@@ -9,15 +9,15 @@
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
- 'event-sourcing:projection:pause',
- 'Set projection to pause',
+ 'event-sourcing:subscription:pause',
+ 'Set subscription to pause',
)]
-final class ProjectionPauseCommand extends ProjectionCommand
+final class SubscriptionPauseCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
- $criteria = $this->projectionCriteria($input);
- $this->projectionist->pause($criteria);
+ $criteria = $this->subscriptionEngineCriteria($input);
+ $this->engine->pause($criteria);
return 0;
}
diff --git a/src/Console/Command/ProjectionReactivateCommand.php b/src/Console/Command/SubscriptionReactivateCommand.php
similarity index 59%
rename from src/Console/Command/ProjectionReactivateCommand.php
rename to src/Console/Command/SubscriptionReactivateCommand.php
index da1aaa7ce..81ec2e1f0 100644
--- a/src/Console/Command/ProjectionReactivateCommand.php
+++ b/src/Console/Command/SubscriptionReactivateCommand.php
@@ -9,15 +9,15 @@
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
- 'event-sourcing:projection:reactivate',
- 'Reactivate failed projections',
+ 'event-sourcing:subscription:reactivate',
+ 'Reactivate subscriptions',
)]
-final class ProjectionReactivateCommand extends ProjectionCommand
+final class SubscriptionReactivateCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
- $criteria = $this->projectionCriteria($input);
- $this->projectionist->reactivate($criteria);
+ $criteria = $this->subscriptionEngineCriteria($input);
+ $this->engine->reactivate($criteria);
return 0;
}
diff --git a/src/Console/Command/ProjectionRemoveCommand.php b/src/Console/Command/SubscriptionRemoveCommand.php
similarity index 63%
rename from src/Console/Command/ProjectionRemoveCommand.php
rename to src/Console/Command/SubscriptionRemoveCommand.php
index f08613283..dbff5177b 100644
--- a/src/Console/Command/ProjectionRemoveCommand.php
+++ b/src/Console/Command/SubscriptionRemoveCommand.php
@@ -10,24 +10,24 @@
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
- 'event-sourcing:projection:remove',
- 'Delete all projection and metadata',
+ 'event-sourcing:subscription:remove',
+ 'Delete all subscriptions',
)]
-final class ProjectionRemoveCommand extends ProjectionCommand
+final class SubscriptionRemoveCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new OutputStyle($input, $output);
- $criteria = $this->projectionCriteria($input);
+ $criteria = $this->subscriptionEngineCriteria($input);
if ($criteria->ids === null) {
- if (!$io->confirm('do you want to remove all projections?', false)) {
+ if (!$io->confirm('do you want to remove all subscriptions?', false)) {
return 1;
}
}
- $this->projectionist->remove($criteria);
+ $this->engine->remove($criteria);
return 0;
}
diff --git a/src/Console/Command/ProjectionRunCommand.php b/src/Console/Command/SubscriptionRunCommand.php
similarity index 87%
rename from src/Console/Command/ProjectionRunCommand.php
rename to src/Console/Command/SubscriptionRunCommand.php
index 1a6d79381..0ef42ec35 100644
--- a/src/Console/Command/ProjectionRunCommand.php
+++ b/src/Console/Command/SubscriptionRunCommand.php
@@ -13,10 +13,10 @@
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
- 'event-sourcing:projection:run',
- 'Run the active projections',
+ 'event-sourcing:subscription:run',
+ 'Run the active subscriptions',
)]
-final class ProjectionRunCommand extends ProjectionCommand
+final class SubscriptionRunCommand extends SubscriptionCommand
{
protected function configure(): void
{
@@ -59,7 +59,7 @@ protected function configure(): void
'rebuild',
null,
InputOption::VALUE_NONE,
- 'rebuild (remove & boot) projections before run',
+ 'rebuild (remove & boot) subscriptions before run',
);
}
@@ -72,13 +72,13 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$rebuild = InputHelper::bool($input->getOption('rebuild'));
- $criteria = $this->projectionCriteria($input);
+ $criteria = $this->subscriptionEngineCriteria($input);
$logger = new ConsoleLogger($output);
$worker = DefaultWorker::create(
function () use ($criteria, $messageLimit): void {
- $this->projectionist->run($criteria, $messageLimit);
+ $this->engine->run($criteria, $messageLimit);
},
[
'runLimit' => $runLimit,
@@ -89,8 +89,8 @@ function () use ($criteria, $messageLimit): void {
);
if ($rebuild) {
- $this->projectionist->remove($criteria);
- $this->projectionist->boot($criteria);
+ $this->engine->remove($criteria);
+ $this->engine->boot($criteria);
}
$worker->run($sleep);
diff --git a/src/Console/Command/ProjectionStatusCommand.php b/src/Console/Command/SubscriptionStatusCommand.php
similarity index 56%
rename from src/Console/Command/ProjectionStatusCommand.php
rename to src/Console/Command/SubscriptionStatusCommand.php
index 52224ef7a..896186658 100644
--- a/src/Console/Command/ProjectionStatusCommand.php
+++ b/src/Console/Command/SubscriptionStatusCommand.php
@@ -6,9 +6,9 @@
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
-use Patchlevel\EventSourcing\Projection\Projection\Projection;
-use Patchlevel\EventSourcing\Projection\Projection\ProjectionError;
-use Patchlevel\EventSourcing\Projection\Projection\ProjectionNotFound;
+use Patchlevel\EventSourcing\Subscription\Store\SubscriptionNotFound;
+use Patchlevel\EventSourcing\Subscription\Subscription\Subscription;
+use Patchlevel\EventSourcing\Subscription\Subscription\SubscriptionError;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
@@ -18,12 +18,12 @@
use function is_array;
use function sprintf;
-/** @psalm-import-type Context from ProjectionError */
+/** @psalm-import-type Context from SubscriptionError */
#[AsCommand(
- 'event-sourcing:projection:status',
- 'View the current status of the projections',
+ 'event-sourcing:subscription:status',
+ 'View the current status of the subscriptions',
)]
-final class ProjectionStatusCommand extends ProjectionCommand
+final class SubscriptionStatusCommand extends SubscriptionCommand
{
protected function configure(): void
{
@@ -32,7 +32,7 @@ protected function configure(): void
$this->addArgument(
'id',
InputArgument::OPTIONAL,
- 'The projection to display more information about',
+ 'The subscription to display more information about',
);
}
@@ -41,7 +41,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$io = new OutputStyle($input, $output);
$id = InputHelper::nullableString($input->getArgument('id'));
- $projections = $this->projectionist->projections();
+ $subscriptions = $this->engine->subscriptions();
if ($id === null) {
$io->table(
@@ -52,20 +52,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int
'error message',
],
array_map(
- static fn (Projection $projection) => [
- $projection->id(),
- $projection->position(),
- $projection->status()->value,
- $projection->projectionError()?->errorMessage,
+ static fn (Subscription $subscription) => [
+ $subscription->id(),
+ $subscription->position(),
+ $subscription->status()->value,
+ $subscription->subscriptionError()?->errorMessage,
],
- $projections,
+ $subscriptions,
),
);
return 0;
}
- $projection = $this->findProjection($projections, $id);
+ $subscription = $this->findSubscription($subscriptions, $id);
$io->horizontalTable(
[
@@ -76,15 +76,15 @@ protected function execute(InputInterface $input, OutputInterface $output): int
],
[
[
- $projection->id(),
- $projection->position(),
- $projection->status()->value,
- $projection->projectionError()?->errorMessage,
+ $subscription->id(),
+ $subscription->position(),
+ $subscription->status()->value,
+ $subscription->subscriptionError()?->errorMessage,
],
],
);
- $contexts = $projection->projectionError()?->errorContext;
+ $contexts = $subscription->subscriptionError()?->errorContext;
if (is_array($contexts)) {
foreach ($contexts as $context) {
@@ -105,15 +105,15 @@ private function displayError(OutputStyle $io, array $context): void
}
}
- /** @param list $projections */
- private function findProjection(array $projections, string $id): Projection
+ /** @param list $subscriptions */
+ private function findSubscription(array $subscriptions, string $id): Subscription
{
- foreach ($projections as $projection) {
- if ($projection->id() === $id) {
- return $projection;
+ foreach ($subscriptions as $subscription) {
+ if ($subscription->id() === $id) {
+ return $subscription;
}
}
- throw new ProjectionNotFound($id);
+ throw new SubscriptionNotFound($id);
}
}
diff --git a/src/Console/Command/ProjectionTeardownCommand.php b/src/Console/Command/SubscriptionTeardownCommand.php
similarity index 58%
rename from src/Console/Command/ProjectionTeardownCommand.php
rename to src/Console/Command/SubscriptionTeardownCommand.php
index 34b990e43..448429a0b 100644
--- a/src/Console/Command/ProjectionTeardownCommand.php
+++ b/src/Console/Command/SubscriptionTeardownCommand.php
@@ -9,15 +9,15 @@
use Symfony\Component\Console\Output\OutputInterface;
#[AsCommand(
- 'event-sourcing:projection:teardown',
- 'Shut down and delete the outdated projections',
+ 'event-sourcing:subscription:teardown',
+ 'Shut down and delete the outdated subscriptions',
)]
-final class ProjectionTeardownCommand extends ProjectionCommand
+final class SubscriptionTeardownCommand extends SubscriptionCommand
{
protected function execute(InputInterface $input, OutputInterface $output): int
{
- $criteria = $this->projectionCriteria($input);
- $this->projectionist->teardown($criteria);
+ $criteria = $this->subscriptionEngineCriteria($input);
+ $this->engine->teardown($criteria);
return 0;
}
diff --git a/src/Metadata/Projector/DuplicateSetupMethod.php b/src/Metadata/Projector/DuplicateSetupMethod.php
deleted file mode 100644
index c279145cb..000000000
--- a/src/Metadata/Projector/DuplicateSetupMethod.php
+++ /dev/null
@@ -1,25 +0,0 @@
-cache->get($projector);
-
- if ($metadata !== null) {
- return $metadata;
- }
-
- $metadata = $this->projectorMetadataFactory->metadata($projector);
-
- $this->cache->set($projector, $metadata);
-
- return $metadata;
- }
-}
diff --git a/src/Metadata/Projector/Psr6ProjectorMetadataFactory.php b/src/Metadata/Projector/Psr6ProjectorMetadataFactory.php
deleted file mode 100644
index 0146cd24f..000000000
--- a/src/Metadata/Projector/Psr6ProjectorMetadataFactory.php
+++ /dev/null
@@ -1,38 +0,0 @@
-cache->getItem($projector);
-
- if ($item->isHit()) {
- $data = $item->get();
- assert($data instanceof ProjectorMetadata);
-
- return $data;
- }
-
- $metadata = $this->projectorMetadataFactory->metadata($projector);
-
- $item->set($metadata);
- $this->cache->save($item);
-
- return $metadata;
- }
-}
diff --git a/src/Metadata/Projector/AttributeProjectorMetadataFactory.php b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php
similarity index 60%
rename from src/Metadata/Projector/AttributeProjectorMetadataFactory.php
rename to src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php
index 0a3bcd116..dc8822687 100644
--- a/src/Metadata/Projector/AttributeProjectorMetadataFactory.php
+++ b/src/Metadata/Subscriber/AttributeSubscriberMetadataFactory.php
@@ -2,37 +2,37 @@
declare(strict_types=1);
-namespace Patchlevel\EventSourcing\Metadata\Projector;
+namespace Patchlevel\EventSourcing\Metadata\Subscriber;
-use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
+use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Attribute\Teardown;
use ReflectionClass;
use function array_key_exists;
-final class AttributeProjectorMetadataFactory implements ProjectorMetadataFactory
+final class AttributeSubscriberMetadataFactory implements SubscriberMetadataFactory
{
- /** @var array */
- private array $projectorMetadata = [];
+ /** @var array */
+ private array $subscriberMetadata = [];
- /** @param class-string $projector */
- public function metadata(string $projector): ProjectorMetadata
+ /** @param class-string $subscriber */
+ public function metadata(string $subscriber): SubscriberMetadata
{
- if (array_key_exists($projector, $this->projectorMetadata)) {
- return $this->projectorMetadata[$projector];
+ if (array_key_exists($subscriber, $this->subscriberMetadata)) {
+ return $this->subscriberMetadata[$subscriber];
}
- $reflector = new ReflectionClass($projector);
+ $reflector = new ReflectionClass($subscriber);
- $attributes = $reflector->getAttributes(Projector::class);
+ $attributes = $reflector->getAttributes(Subscriber::class);
if ($attributes === []) {
- throw new ClassIsNotAProjector($projector);
+ throw new ClassIsNotASubscriber($subscriber);
}
- $projectorInfo = $attributes[0]->newInstance();
+ $subscriberInfo = $attributes[0]->newInstance();
$methods = $reflector->getMethods();
@@ -53,7 +53,7 @@ public function metadata(string $projector): ProjectorMetadata
if ($method->getAttributes(Setup::class)) {
if ($createMethod !== null) {
throw new DuplicateSetupMethod(
- $projector,
+ $subscriber,
$createMethod,
$method->getName(),
);
@@ -68,7 +68,7 @@ public function metadata(string $projector): ProjectorMetadata
if ($dropMethod !== null) {
throw new DuplicateTeardownMethod(
- $projector,
+ $subscriber,
$dropMethod,
$method->getName(),
);
@@ -77,16 +77,16 @@ public function metadata(string $projector): ProjectorMetadata
$dropMethod = $method->getName();
}
- $metadata = new ProjectorMetadata(
- $projectorInfo->id,
- $projectorInfo->group,
- $projectorInfo->runMode,
+ $metadata = new SubscriberMetadata(
+ $subscriberInfo->id,
+ $subscriberInfo->group,
+ $subscriberInfo->runMode,
$subscribeMethods,
$createMethod,
$dropMethod,
);
- $this->projectorMetadata[$projector] = $metadata;
+ $this->subscriberMetadata[$subscriber] = $metadata;
return $metadata;
}
diff --git a/src/Metadata/Projector/ClassIsNotAProjector.php b/src/Metadata/Subscriber/ClassIsNotASubscriber.php
similarity index 65%
rename from src/Metadata/Projector/ClassIsNotAProjector.php
rename to src/Metadata/Subscriber/ClassIsNotASubscriber.php
index feb9276ff..3078241a8 100644
--- a/src/Metadata/Projector/ClassIsNotAProjector.php
+++ b/src/Metadata/Subscriber/ClassIsNotASubscriber.php
@@ -2,20 +2,20 @@
declare(strict_types=1);
-namespace Patchlevel\EventSourcing\Metadata\Projector;
+namespace Patchlevel\EventSourcing\Metadata\Subscriber;
use Patchlevel\EventSourcing\Metadata\MetadataException;
use function sprintf;
-final class ClassIsNotAProjector extends MetadataException
+final class ClassIsNotASubscriber extends MetadataException
{
/** @param class-string $class */
public function __construct(string $class)
{
parent::__construct(
sprintf(
- 'Class "%s" is not a projector',
+ 'Class "%s" is not a subscriber',
$class,
),
);
diff --git a/src/Metadata/Subscriber/DuplicateSetupMethod.php b/src/Metadata/Subscriber/DuplicateSetupMethod.php
new file mode 100644
index 000000000..c39f2005f
--- /dev/null
+++ b/src/Metadata/Subscriber/DuplicateSetupMethod.php
@@ -0,0 +1,25 @@
+cache->get($subscriber);
+
+ if ($metadata !== null) {
+ return $metadata;
+ }
+
+ $metadata = $this->subscriberMetadataFactory->metadata($subscriber);
+
+ $this->cache->set($subscriber, $metadata);
+
+ return $metadata;
+ }
+}
diff --git a/src/Metadata/Subscriber/Psr6SubscriberMetadataFactory.php b/src/Metadata/Subscriber/Psr6SubscriberMetadataFactory.php
new file mode 100644
index 000000000..305e689ec
--- /dev/null
+++ b/src/Metadata/Subscriber/Psr6SubscriberMetadataFactory.php
@@ -0,0 +1,38 @@
+cache->getItem($subscriber);
+
+ if ($item->isHit()) {
+ $data = $item->get();
+ assert($data instanceof SubscriberMetadata);
+
+ return $data;
+ }
+
+ $metadata = $this->subscriberMetadataFactory->metadata($subscriber);
+
+ $item->set($metadata);
+ $this->cache->save($item);
+
+ return $metadata;
+ }
+}
diff --git a/src/Metadata/Projector/ProjectorMetadata.php b/src/Metadata/Subscriber/SubscriberMetadata.php
similarity index 59%
rename from src/Metadata/Projector/ProjectorMetadata.php
rename to src/Metadata/Subscriber/SubscriberMetadata.php
index 5d776f4cc..cd3b93b95 100644
--- a/src/Metadata/Projector/ProjectorMetadata.php
+++ b/src/Metadata/Subscriber/SubscriberMetadata.php
@@ -2,16 +2,16 @@
declare(strict_types=1);
-namespace Patchlevel\EventSourcing\Metadata\Projector;
+namespace Patchlevel\EventSourcing\Metadata\Subscriber;
-use Patchlevel\EventSourcing\Projection\Projection\Projection;
-use Patchlevel\EventSourcing\Projection\Projection\RunMode;
+use Patchlevel\EventSourcing\Subscription\Subscription\RunMode;
+use Patchlevel\EventSourcing\Subscription\Subscription\Subscription;
-final class ProjectorMetadata
+final class SubscriberMetadata
{
public function __construct(
public readonly string $id,
- public readonly string $group = Projection::DEFAULT_GROUP,
+ public readonly string $group = Subscription::DEFAULT_GROUP,
public readonly RunMode $runMode = RunMode::FromBeginning,
/** @var array> */
public readonly array $subscribeMethods = [],
diff --git a/src/Metadata/Subscriber/SubscriberMetadataFactory.php b/src/Metadata/Subscriber/SubscriberMetadataFactory.php
new file mode 100644
index 000000000..db174d35e
--- /dev/null
+++ b/src/Metadata/Subscriber/SubscriberMetadataFactory.php
@@ -0,0 +1,11 @@
+ */
- private array $projections = [];
-
- /** @param list $projections */
- public function __construct(array $projections = [])
- {
- foreach ($projections as $projection) {
- $this->projections[$projection->id()] = $projection;
- }
- }
-
- public function get(string $projectionId): Projection
- {
- if (array_key_exists($projectionId, $this->projections)) {
- return $this->projections[$projectionId];
- }
-
- throw new ProjectionNotFound($projectionId);
- }
-
- /** @return list */
- public function find(ProjectionCriteria|null $criteria = null): array
- {
- $projections = array_values($this->projections);
-
- if ($criteria === null) {
- return $projections;
- }
-
- return array_values(
- array_filter(
- $projections,
- static function (Projection $projection) use ($criteria): bool {
- if ($criteria->ids !== null) {
- if (!in_array($projection->id(), $criteria->ids, true)) {
- return false;
- }
- }
-
- if ($criteria->groups !== null) {
- if (!in_array($projection->group(), $criteria->groups, true)) {
- return false;
- }
- }
-
- if ($criteria->status !== null) {
- if (!in_array($projection->status(), $criteria->status, true)) {
- return false;
- }
- }
-
- return true;
- },
- ),
- );
- }
-
- public function add(Projection $projection): void
- {
- if (array_key_exists($projection->id(), $this->projections)) {
- throw new ProjectionAlreadyExists($projection->id());
- }
-
- $this->projections[$projection->id()] = $projection;
- }
-
- public function update(Projection $projection): void
- {
- if (!array_key_exists($projection->id(), $this->projections)) {
- throw new ProjectionNotFound($projection->id());
- }
-
- $this->projections[$projection->id()] = $projection;
- }
-
- public function remove(Projection $projection): void
- {
- unset($this->projections[$projection->id()]);
- }
-}
diff --git a/src/Projection/Projection/Store/LockableProjectionStore.php b/src/Projection/Projection/Store/LockableProjectionStore.php
deleted file mode 100644
index a8e3d23d0..000000000
--- a/src/Projection/Projection/Store/LockableProjectionStore.php
+++ /dev/null
@@ -1,12 +0,0 @@
- */
- public function find(ProjectionCriteria|null $criteria = null): array;
-
- /** @throws ProjectionAlreadyExists */
- public function add(Projection $projection): void;
-
- /** @throws ProjectionNotFound */
- public function update(Projection $projection): void;
-
- /** @throws ProjectionNotFound */
- public function remove(Projection $projection): void;
-}
diff --git a/src/Projection/Projectionist/DefaultProjectionist.php b/src/Projection/Projectionist/DefaultProjectionist.php
deleted file mode 100644
index 0f3e905c5..000000000
--- a/src/Projection/Projectionist/DefaultProjectionist.php
+++ /dev/null
@@ -1,868 +0,0 @@
-logger?->info(
- 'Projectionist: Start booting.',
- );
-
- $this->discoverNewProjections();
- $this->handleRetryProjections($criteria);
- $this->handleNewProjections($criteria);
-
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [ProjectionStatus::Booting],
- ),
- function ($projections) use ($limit): void {
- $projections = $this->fastForwardFromNowProjections($projections);
-
- if (count($projections) === 0) {
- $this->logger?->info('Projectionist: No projections in booting status, finish booting.');
-
- return;
- }
-
- $startIndex = $this->lowestProjectionPosition($projections);
-
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Event stream is processed for booting from position %s.',
- $startIndex,
- ),
- );
-
- $stream = null;
- $messageCounter = 0;
-
- try {
- $stream = $this->messageStore->load(
- new Criteria(fromIndex: $startIndex),
- );
-
- foreach ($stream as $message) {
- $index = $stream->index();
-
- if ($index === null) {
- throw new UnexpectedError('Stream index is null, this should not happen.');
- }
-
- foreach ($projections as $projection) {
- if (!$projection->isBooting()) {
- continue;
- }
-
- if ($projection->position() >= $index) {
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Projection "%s" is farther than the current position (%d > %d), continue booting.',
- $projection->id(),
- $projection->position(),
- $index,
- ),
- );
-
- continue;
- }
-
- $this->handleMessage($index, $message, $projection);
- }
-
- $messageCounter++;
-
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Current event stream position for booting: %s',
- $index,
- ),
- );
-
- if ($limit !== null && $messageCounter >= $limit) {
- $this->logger?->info(
- sprintf(
- 'Projectionist: Message limit (%d) reached, finish booting.',
- $limit,
- ),
- );
-
- return;
- }
- }
- } finally {
- if ($messageCounter > 0) {
- foreach ($projections as $projection) {
- if (!$projection->isBooting()) {
- continue;
- }
-
- $this->projectionStore->update($projection);
- }
- }
-
- $stream?->close();
- }
-
- $this->logger?->debug('Projectionist: End of stream for booting has been reached.');
-
- foreach ($projections as $projection) {
- if (!$projection->isBooting()) {
- continue;
- }
-
- if ($projection->runMode() === RunMode::Once) {
- $projection->finished();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(sprintf(
- 'Projectionist: Projection "%s" run only once and has been set to finished.',
- $projection->id(),
- ));
-
- continue;
- }
-
- $projection->active();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(sprintf(
- 'Projectionist: Projection "%s" has been set to active after booting.',
- $projection->id(),
- ));
- }
-
- $this->logger?->info('Projectionist: Finish booting.');
- },
- );
- }
-
- public function run(
- ProjectionistCriteria|null $criteria = null,
- int|null $limit = null,
- ): void {
- $criteria ??= new ProjectionistCriteria();
-
- $this->logger?->info('Projectionist: Start processing.');
-
- $this->discoverNewProjections();
- $this->handleOutdatedProjections($criteria);
- $this->handleRetryProjections($criteria);
-
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [ProjectionStatus::Active],
- ),
- function (array $projections) use ($limit): void {
- if (count($projections) === 0) {
- $this->logger?->info('Projectionist: No projections to process, finish processing.');
-
- return;
- }
-
- $startIndex = $this->lowestProjectionPosition($projections);
-
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Event stream is processed from position %d.',
- $startIndex,
- ),
- );
-
- $stream = null;
- $messageCounter = 0;
-
- try {
- $criteria = new Criteria(fromIndex: $startIndex);
- $stream = $this->messageStore->load($criteria);
-
- foreach ($stream as $message) {
- $index = $stream->index();
-
- if ($index === null) {
- throw new UnexpectedError('Stream index is null, this should not happen.');
- }
-
- foreach ($projections as $projection) {
- if (!$projection->isActive()) {
- continue;
- }
-
- if ($projection->position() >= $index) {
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Projection "%s" is farther than the current position (%d > %d), continue processing.',
- $projection->id(),
- $projection->position(),
- $index,
- ),
- );
-
- continue;
- }
-
- $this->handleMessage($index, $message, $projection);
- }
-
- $messageCounter++;
-
- $this->logger?->debug(sprintf('Projectionist: Current event stream position: %s', $index));
-
- if ($limit !== null && $messageCounter >= $limit) {
- $this->logger?->info(
- sprintf(
- 'Projectionist: Message limit (%d) reached, finish processing.',
- $limit,
- ),
- );
-
- return;
- }
- }
- } finally {
- if ($messageCounter > 0) {
- foreach ($projections as $projection) {
- if (!$projection->isActive()) {
- continue;
- }
-
- $this->projectionStore->update($projection);
- }
- }
-
- $stream?->close();
- }
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: End of stream on position "%d" has been reached, finish processing.',
- $stream->index() ?: 'unknown',
- ),
- );
- },
- );
- }
-
- public function teardown(ProjectionistCriteria|null $criteria = null): void
- {
- $criteria ??= new ProjectionistCriteria();
-
- $this->discoverNewProjections();
-
- $this->logger?->info('Projectionist: Start teardown outdated projections.');
-
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [ProjectionStatus::Outdated],
- ),
- function (array $projections): void {
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- $this->logger?->warning(
- sprintf(
- 'Projectionist: Projector for "%s" to teardown not found, skipped.',
- $projection->id(),
- ),
- );
-
- continue;
- }
-
- $teardownMethod = $projector->teardownMethod();
-
- if (!$teardownMethod) {
- $this->projectionStore->remove($projection);
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: Projector "%s" for "%s" has no teardown method and was immediately removed.',
- $projector::class,
- $projection->id(),
- ),
- );
-
- continue;
- }
-
- try {
- $teardownMethod();
-
- $this->logger?->debug(sprintf(
- 'Projectionist: For Projector "%s" for "%s" the teardown method has been executed and is now prepared to be removed.',
- $projector::class,
- $projection->id(),
- ));
- } catch (Throwable $e) {
- $this->logger?->error(
- sprintf(
- 'Projectionist: Projection "%s" for "%s" has an error in the teardown method, skipped: %s',
- $projector::class,
- $projection->id(),
- $e->getMessage(),
- ),
- );
- continue;
- }
-
- $this->projectionStore->remove($projection);
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: Projection "%s" removed.',
- $projection->id(),
- ),
- );
- }
-
- $this->logger?->info('Projectionist: Finish teardown.');
- },
- );
- }
-
- public function remove(ProjectionistCriteria|null $criteria = null): void
- {
- $criteria ??= new ProjectionistCriteria();
-
- $this->discoverNewProjections();
-
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- ),
- function (array $projections): void {
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- $this->projectionStore->remove($projection);
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: Projection "%s" removed without a suitable projector.',
- $projection->id(),
- ),
- );
-
- continue;
- }
-
- $teardownMethod = $projector->teardownMethod();
-
- if (!$teardownMethod) {
- $this->projectionStore->remove($projection);
-
- $this->logger?->info(
- sprintf('Projectionist: Projection "%s" removed.', $projection->id()),
- );
-
- continue;
- }
-
- try {
- $teardownMethod();
- } catch (Throwable $e) {
- $this->logger?->error(
- sprintf(
- 'Projectionist: Projector "%s" teardown method could not be executed: %s',
- $projector::class,
- $e->getMessage(),
- ),
- );
- }
-
- $this->projectionStore->remove($projection);
-
- $this->logger?->info(
- sprintf('Projectionist: Projection "%s" removed.', $projection->id()),
- );
- }
- },
- );
- }
-
- public function reactivate(ProjectionistCriteria|null $criteria = null): void
- {
- $criteria ??= new ProjectionistCriteria();
-
- $this->discoverNewProjections();
-
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [
- ProjectionStatus::Error,
- ProjectionStatus::Outdated,
- ProjectionStatus::Paused,
- ProjectionStatus::Finished,
- ],
- ),
- function (array $projections): void {
- /** @var Projection $projection */
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- $this->logger?->debug(
- sprintf('Projectionist: Projector for "%s" not found, skipped.', $projection->id()),
- );
-
- continue;
- }
-
- $error = $projection->projectionError();
-
- if ($error) {
- $projection->doRetry();
- $projection->resetRetry();
-
- $this->projectionStore->update($projection);
-
- $this->logger?->info(sprintf(
- 'Projectionist: Projector "%s" for "%s" is reactivated.',
- $projector::class,
- $projection->id(),
- ));
-
- continue;
- }
-
- $projection->active();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(sprintf(
- 'Projectionist: Projector "%s" for "%s" is reactivated.',
- $projector::class,
- $projection->id(),
- ));
- }
- },
- );
- }
-
- public function pause(ProjectionistCriteria|null $criteria = null): void
- {
- $criteria ??= new ProjectionistCriteria();
-
- $this->discoverNewProjections();
-
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [
- ProjectionStatus::Active,
- ProjectionStatus::Booting,
- ProjectionStatus::Error,
- ],
- ),
- function (array $projections): void {
- /** @var Projection $projection */
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- $this->logger?->debug(
- sprintf('Projectionist: Projector for "%s" not found, skipped.', $projection->id()),
- );
-
- continue;
- }
-
- $projection->pause();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(sprintf(
- 'Projectionist: Projector "%s" for "%s" is paused.',
- $projector::class,
- $projection->id(),
- ));
- }
- },
- );
- }
-
- /** @return list */
- public function projections(ProjectionistCriteria|null $criteria = null): array
- {
- $criteria ??= new ProjectionistCriteria();
-
- $this->discoverNewProjections();
-
- return $this->projectionStore->find(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- ),
- );
- }
-
- private function handleMessage(int $index, Message $message, Projection $projection): void
- {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- throw ProjectorNotFound::forProjectionId($projection->id());
- }
-
- $subscribeMethods = $projector->subscribeMethods($message->event()::class);
-
- if ($subscribeMethods === []) {
- $projection->changePosition($index);
-
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Projector "%s" for "%s" has no subscribe methods for "%s", continue.',
- $projector::class,
- $projection->id(),
- $message->event()::class,
- ),
- );
-
- return;
- }
-
- try {
- foreach ($subscribeMethods as $subscribeMethod) {
- $subscribeMethod($message);
- }
- } catch (Throwable $e) {
- $this->logger?->error(
- sprintf(
- 'Projectionist: Projector "%s" for "%s" could not process the event "%s": %s',
- $projector::class,
- $projection->id(),
- $message->event()::class,
- $e->getMessage(),
- ),
- );
-
- $this->handleError($projection, $e);
-
- return;
- }
-
- $projection->changePosition($index);
- $projection->resetRetry();
-
- $this->logger?->debug(
- sprintf(
- 'Projectionist: Projector "%s" for "%s" processed the event "%s".',
- $projector::class,
- $projection->id(),
- $message->event()::class,
- ),
- );
- }
-
- private function projector(string $projectionId): ProjectorAccessor|null
- {
- return $this->projectorRepository->get($projectionId);
- }
-
- private function handleOutdatedProjections(ProjectionistCriteria $criteria): void
- {
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [ProjectionStatus::Active, ProjectionStatus::Paused, ProjectionStatus::Finished],
- ),
- function (array $projections): void {
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if ($projector) {
- continue;
- }
-
- $projection->outdated();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: Projector for "%s" not found and has been marked as outdated.',
- $projection->id(),
- ),
- );
- }
- },
- );
- }
-
- private function handleRetryProjections(ProjectionistCriteria $criteria): void
- {
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [ProjectionStatus::Error],
- ),
- function (array $projections): void {
- /** @var Projection $projection */
- foreach ($projections as $projection) {
- $error = $projection->projectionError();
-
- if ($error === null) {
- continue;
- }
-
- $retryable = in_array(
- $error->previousStatus,
- [ProjectionStatus::New, ProjectionStatus::Booting, ProjectionStatus::Active],
- true,
- );
-
- if (!$retryable) {
- continue;
- }
-
- if (!$this->retryStrategy->shouldRetry($projection)) {
- continue;
- }
-
- $projection->doRetry();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: Retry projection "%s" (%d) and set back to %s.',
- $projection->id(),
- $projection->retryAttempt(),
- $projection->status()->value,
- ),
- );
- }
- },
- );
- }
-
- /**
- * @param list $projections
- *
- * @return list
- */
- private function fastForwardFromNowProjections(array $projections): array
- {
- $latestIndex = null;
- $forwardedProjections = [];
-
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- $forwardedProjections[] = $projection;
-
- continue;
- }
-
- if ($projection->runMode() === RunMode::FromBeginning || $projection->runMode() === RunMode::Once) {
- $forwardedProjections[] = $projection;
-
- continue;
- }
-
- if ($latestIndex === null) {
- $latestIndex = $this->latestIndex();
- }
-
- $projection->changePosition($latestIndex);
- $projection->active();
- $this->projectionStore->update($projection);
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: Projector "%s" for "%s" is in "from now" mode: skip past messages and set to active.',
- $projector::class,
- $projection->id(),
- ),
- );
- }
-
- return $forwardedProjections;
- }
-
- private function handleNewProjections(ProjectionistCriteria $criteria): void
- {
- $this->findForUpdate(
- new ProjectionCriteria(
- ids: $criteria->ids,
- groups: $criteria->groups,
- status: [ProjectionStatus::New],
- ),
- function (array $projections): void {
- foreach ($projections as $projection) {
- $projector = $this->projector($projection->id());
-
- if (!$projector) {
- throw ProjectorNotFound::forProjectionId($projection->id());
- }
-
- $setupMethod = $projector->setupMethod();
-
- if (!$setupMethod) {
- $projection->booting();
- $this->projectionStore->update($projection);
-
- $this->logger?->debug(sprintf(
- 'Projectionist: Projector "%s" for "%s" has no setup method, continue.',
- $projector::class,
- $projection->id(),
- ));
-
- continue;
- }
-
- try {
- $setupMethod();
-
- $projection->booting();
- $this->projectionStore->update($projection);
-
- $this->logger?->debug(sprintf(
- 'Projectionist: For Projector "%s" for "%s" the setup method has been executed and is now prepared for data.',
- $projector::class,
- $projection->id(),
- ));
- } catch (Throwable $e) {
- $this->logger?->error(sprintf(
- 'Projectionist: Projector "%s" for "%s" has an error in the setup method: %s',
- $projector::class,
- $projection->id(),
- $e->getMessage(),
- ));
-
- $this->handleError($projection, $e);
- }
- }
- },
- );
- }
-
- private function discoverNewProjections(): void
- {
- $this->findForUpdate(
- new ProjectionCriteria(),
- function (array $projections): void {
- foreach ($this->projectorRepository->all() as $projector) {
- foreach ($projections as $projection) {
- if ($projection->id() === $projector->id()) {
- continue 2;
- }
- }
-
- $this->projectionStore->add(
- new Projection(
- $projector->id(),
- $projector->group(),
- $projector->runMode(),
- ),
- );
-
- $this->logger?->info(
- sprintf(
- 'Projectionist: New Projector "%s" was found and added to the projection store.',
- $projector->id(),
- ),
- );
- }
- },
- );
- }
-
- private function latestIndex(): int
- {
- $stream = $this->messageStore->load(null, 1, null, true);
-
- return $stream->index() ?: 0;
- }
-
- /** @param list $projections */
- private function lowestProjectionPosition(array $projections): int
- {
- $min = null;
-
- foreach ($projections as $projection) {
- if ($min !== null && $projection->position() >= $min) {
- continue;
- }
-
- $min = $projection->position();
- }
-
- if ($min === null) {
- return 0;
- }
-
- return $min;
- }
-
- /** @param Closure(list):void $closure */
- private function findForUpdate(ProjectionCriteria $criteria, Closure $closure): void
- {
- if (!$this->projectionStore instanceof LockableProjectionStore) {
- $closure($this->projectionStore->find($criteria));
-
- return;
- }
-
- $this->projectionStore->inLock(function () use ($closure, $criteria): void {
- $projections = $this->projectionStore->find($criteria);
-
- $closure($projections);
- });
- }
-
- private function handleError(Projection $projection, Throwable $throwable): void
- {
- $projection->error($throwable);
- $this->projectionStore->update($projection);
- }
-}
diff --git a/src/Projection/Projectionist/Projectionist.php b/src/Projection/Projectionist/Projectionist.php
deleted file mode 100644
index 7ad6e75bb..000000000
--- a/src/Projection/Projectionist/Projectionist.php
+++ /dev/null
@@ -1,41 +0,0 @@
- */
- public function projections(ProjectionistCriteria|null $criteria = null): array;
-}
diff --git a/src/Projection/Projectionist/ProjectorNotFound.php b/src/Projection/Projectionist/ProjectorNotFound.php
deleted file mode 100644
index f2af3622d..000000000
--- a/src/Projection/Projectionist/ProjectorNotFound.php
+++ /dev/null
@@ -1,22 +0,0 @@
- */
- private array $projectorsMap = [];
-
- /** @param iterable