From 8656164611b34f738c46624cdd8fd2afb40aac8d Mon Sep 17 00:00:00 2001 From: David Badura Date: Sat, 9 Mar 2024 15:58:39 +0100 Subject: [PATCH 1/2] split boot method into boot and setup in subscription engine --- composer.json | 2 +- composer.lock | 410 +++++++------- .../Command/SubscriptionBootCommand.php | 91 ++- .../Command/SubscriptionSetupCommand.php | 24 + src/Store/DoctrineDbalStore.php | 2 +- .../Engine/DefaultSubscriptionEngine.php | 217 ++++---- .../Engine/SubscriptionEngine.php | 2 + tests/Benchmark/SubscriptionEngineBench.php | 1 + .../IntegrationTest.php | 4 +- .../BasicIntegrationTest.php | 2 + .../Subscription/SubscriptionTest.php | 3 + .../Unit/Repository/DefaultRepositoryTest.php | 16 +- tests/Unit/Store/DoctrineDbalStoreTest.php | 2 +- .../Engine/DefaultSubscriptionEngineTest.php | 517 +++++++++++------- 14 files changed, 757 insertions(+), 536 deletions(-) create mode 100644 src/Console/Command/SubscriptionSetupCommand.php diff --git a/composer.json b/composer.json index 55192fee7..2d131fac0 100644 --- a/composer.json +++ b/composer.json @@ -23,7 +23,7 @@ "doctrine/dbal": "^3.8.1|^4.0.0", "doctrine/migrations": "^3.3.2", "patchlevel/hydrator": "^1.2.0", - "patchlevel/worker": "^1.1.1", + "patchlevel/worker": "^1.2.0", "psr/cache": "^2.0.0|^3.0.0", "psr/clock": "^1.0", "psr/event-dispatcher": "^1.0", diff --git a/composer.lock b/composer.lock index 51668911a..0bcc41b1e 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "0c745538b704294d92dfee9c2b9d6323", + "content-hash": "8e8c962c4e598466f0fca1e88c81ef9c", "packages": [ { "name": "brick/math", @@ -63,16 +63,16 @@ }, { "name": "doctrine/dbal", - "version": "4.0.0", + "version": "4.0.1", "source": { "type": "git", "url": "https://github.com/doctrine/dbal.git", - "reference": "53df8c432978b716a805143eb701436d54ec705e" + "reference": "9e588fe1f38a443cb17de6b86b803d9e028e2156" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/dbal/zipball/53df8c432978b716a805143eb701436d54ec705e", - "reference": "53df8c432978b716a805143eb701436d54ec705e", + "url": "https://api.github.com/repos/doctrine/dbal/zipball/9e588fe1f38a443cb17de6b86b803d9e028e2156", + "reference": "9e588fe1f38a443cb17de6b86b803d9e028e2156", "shasum": "" }, "require": { @@ -85,16 +85,16 @@ "doctrine/coding-standard": "12.0.0", "fig/log-test": "^1", "jetbrains/phpstorm-stubs": "2023.2", - "phpstan/phpstan": "1.10.57", + "phpstan/phpstan": "1.10.58", "phpstan/phpstan-phpunit": "1.3.15", "phpstan/phpstan-strict-rules": "^1.5", "phpunit/phpunit": "10.5.9", "psalm/plugin-phpunit": "0.18.4", "slevomat/coding-standard": "8.13.1", - "squizlabs/php_codesniffer": "3.8.1", + "squizlabs/php_codesniffer": "3.9.0", "symfony/cache": "^6.3.8|^7.0", "symfony/console": "^5.4|^6.3|^7.0", - "vimeo/psalm": "5.16.0" + "vimeo/psalm": "5.21.1" }, "suggest": { "symfony/console": "For helpful console commands such as SQL execution and import of files." @@ -151,7 +151,7 @@ ], "support": { "issues": "https://github.com/doctrine/dbal/issues", - "source": "https://github.com/doctrine/dbal/tree/4.0.0" + "source": "https://github.com/doctrine/dbal/tree/4.0.1" }, "funding": [ { @@ -167,7 +167,7 @@ "type": "tidelift" } ], - "time": "2024-02-03T19:11:19+00:00" + "time": "2024-03-03T15:59:11+00:00" }, { "name": "doctrine/deprecations", @@ -309,16 +309,16 @@ }, { "name": "doctrine/migrations", - "version": "3.7.2", + "version": "3.7.4", "source": { "type": "git", "url": "https://github.com/doctrine/migrations.git", - "reference": "47af29eef49f29ebee545947e8b2a4b3be318c8a" + "reference": "954e0a314c2f0eb9fb418210445111747de254a6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/migrations/zipball/47af29eef49f29ebee545947e8b2a4b3be318c8a", - "reference": "47af29eef49f29ebee545947e8b2a4b3be318c8a", + "url": "https://api.github.com/repos/doctrine/migrations/zipball/954e0a314c2f0eb9fb418210445111747de254a6", + "reference": "954e0a314c2f0eb9fb418210445111747de254a6", "shasum": "" }, "require": { @@ -391,7 +391,7 @@ ], "support": { "issues": "https://github.com/doctrine/migrations/issues", - "source": "https://github.com/doctrine/migrations/tree/3.7.2" + "source": "https://github.com/doctrine/migrations/tree/3.7.4" }, "funding": [ { @@ -407,7 +407,7 @@ "type": "tidelift" } ], - "time": "2023-12-05T11:35:05+00:00" + "time": "2024-03-06T13:41:11+00:00" }, { "name": "patchlevel/hydrator", @@ -475,16 +475,16 @@ }, { "name": "patchlevel/worker", - "version": "1.1.1", + "version": "1.2.0", "source": { "type": "git", "url": "https://github.com/patchlevel/worker.git", - "reference": "1076160dcbf704adea66db13196e320479ec3090" + "reference": "37cf39383ddaf4b65ed04cecfb4fb6c6fe5cbdf7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/patchlevel/worker/zipball/1076160dcbf704adea66db13196e320479ec3090", - "reference": "1076160dcbf704adea66db13196e320479ec3090", + "url": "https://api.github.com/repos/patchlevel/worker/zipball/37cf39383ddaf4b65ed04cecfb4fb6c6fe5cbdf7", + "reference": "37cf39383ddaf4b65ed04cecfb4fb6c6fe5cbdf7", "shasum": "" }, "require": { @@ -532,9 +532,9 @@ ], "support": { "issues": "https://github.com/patchlevel/worker/issues", - "source": "https://github.com/patchlevel/worker/tree/1.1.1" + "source": "https://github.com/patchlevel/worker/tree/1.2.0" }, - "time": "2024-02-01T13:03:05+00:00" + "time": "2024-03-12T11:15:22+00:00" }, { "name": "psr/cache", @@ -1020,16 +1020,16 @@ }, { "name": "symfony/console", - "version": "v7.0.3", + "version": "v7.0.4", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "c5010d50f1ee4b25cfa0201d9915cf1b14071456" + "reference": "6b099f3306f7c9c2d2786ed736d0026b2903205f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/c5010d50f1ee4b25cfa0201d9915cf1b14071456", - "reference": "c5010d50f1ee4b25cfa0201d9915cf1b14071456", + "url": "https://api.github.com/repos/symfony/console/zipball/6b099f3306f7c9c2d2786ed736d0026b2903205f", + "reference": "6b099f3306f7c9c2d2786ed736d0026b2903205f", "shasum": "" }, "require": { @@ -1093,7 +1093,7 @@ "terminal" ], "support": { - "source": "https://github.com/symfony/console/tree/v7.0.3" + "source": "https://github.com/symfony/console/tree/v7.0.4" }, "funding": [ { @@ -1109,7 +1109,7 @@ "type": "tidelift" } ], - "time": "2024-01-23T15:02:46+00:00" + "time": "2024-02-22T20:27:20+00:00" }, { "name": "symfony/event-dispatcher", @@ -1795,16 +1795,16 @@ }, { "name": "symfony/string", - "version": "v7.0.3", + "version": "v7.0.4", "source": { "type": "git", "url": "https://github.com/symfony/string.git", - "reference": "524aac4a280b90a4420d8d6a040718d0586505ac" + "reference": "f5832521b998b0bec40bee688ad5de98d4cf111b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/string/zipball/524aac4a280b90a4420d8d6a040718d0586505ac", - "reference": "524aac4a280b90a4420d8d6a040718d0586505ac", + "url": "https://api.github.com/repos/symfony/string/zipball/f5832521b998b0bec40bee688ad5de98d4cf111b", + "reference": "f5832521b998b0bec40bee688ad5de98d4cf111b", "shasum": "" }, "require": { @@ -1861,7 +1861,7 @@ "utf8" ], "support": { - "source": "https://github.com/symfony/string/tree/v7.0.3" + "source": "https://github.com/symfony/string/tree/v7.0.4" }, "funding": [ { @@ -1877,20 +1877,20 @@ "type": "tidelift" } ], - "time": "2024-01-29T15:41:16+00:00" + "time": "2024-02-01T13:17:36+00:00" }, { "name": "symfony/var-exporter", - "version": "v7.0.3", + "version": "v7.0.4", "source": { "type": "git", "url": "https://github.com/symfony/var-exporter.git", - "reference": "1fb79308cb5fc2b44bff6e8af10a5af6812e05b8" + "reference": "dfb0acb6803eb714f05d97dd4c5abe6d5fa9fe41" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/var-exporter/zipball/1fb79308cb5fc2b44bff6e8af10a5af6812e05b8", - "reference": "1fb79308cb5fc2b44bff6e8af10a5af6812e05b8", + "url": "https://api.github.com/repos/symfony/var-exporter/zipball/dfb0acb6803eb714f05d97dd4c5abe6d5fa9fe41", + "reference": "dfb0acb6803eb714f05d97dd4c5abe6d5fa9fe41", "shasum": "" }, "require": { @@ -1935,7 +1935,7 @@ "serialize" ], "support": { - "source": "https://github.com/symfony/var-exporter/tree/v7.0.3" + "source": "https://github.com/symfony/var-exporter/tree/v7.0.4" }, "funding": [ { @@ -1951,7 +1951,7 @@ "type": "tidelift" } ], - "time": "2024-01-23T15:02:46+00:00" + "time": "2024-02-26T10:35:24+00:00" } ], "packages-dev": [ @@ -2287,16 +2287,16 @@ }, { "name": "composer/pcre", - "version": "3.1.1", + "version": "3.1.2", "source": { "type": "git", "url": "https://github.com/composer/pcre.git", - "reference": "00104306927c7a0919b4ced2aaa6782c1e61a3c9" + "reference": "4775f35b2d70865807c89d32c8e7385b86eb0ace" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/composer/pcre/zipball/00104306927c7a0919b4ced2aaa6782c1e61a3c9", - "reference": "00104306927c7a0919b4ced2aaa6782c1e61a3c9", + "url": "https://api.github.com/repos/composer/pcre/zipball/4775f35b2d70865807c89d32c8e7385b86eb0ace", + "reference": "4775f35b2d70865807c89d32c8e7385b86eb0ace", "shasum": "" }, "require": { @@ -2338,7 +2338,7 @@ ], "support": { "issues": "https://github.com/composer/pcre/issues", - "source": "https://github.com/composer/pcre/tree/3.1.1" + "source": "https://github.com/composer/pcre/tree/3.1.2" }, "funding": [ { @@ -2354,7 +2354,7 @@ "type": "tidelift" } ], - "time": "2023-10-11T07:11:09+00:00" + "time": "2024-03-07T15:38:35+00:00" }, { "name": "composer/semver", @@ -2748,16 +2748,16 @@ }, { "name": "doctrine/collections", - "version": "2.1.4", + "version": "2.2.1", "source": { "type": "git", "url": "https://github.com/doctrine/collections.git", - "reference": "72328a11443a0de79967104ad36ba7b30bded134" + "reference": "420480fc085bc65f3c956af13abe8e7546f94813" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/collections/zipball/72328a11443a0de79967104ad36ba7b30bded134", - "reference": "72328a11443a0de79967104ad36ba7b30bded134", + "url": "https://api.github.com/repos/doctrine/collections/zipball/420480fc085bc65f3c956af13abe8e7546f94813", + "reference": "420480fc085bc65f3c956af13abe8e7546f94813", "shasum": "" }, "require": { @@ -2769,7 +2769,7 @@ "ext-json": "*", "phpstan/phpstan": "^1.8", "phpstan/phpstan-phpunit": "^1.0", - "phpunit/phpunit": "^9.5", + "phpunit/phpunit": "^10.5", "vimeo/psalm": "^5.11" }, "type": "library", @@ -2814,7 +2814,7 @@ ], "support": { "issues": "https://github.com/doctrine/collections/issues", - "source": "https://github.com/doctrine/collections/tree/2.1.4" + "source": "https://github.com/doctrine/collections/tree/2.2.1" }, "funding": [ { @@ -2830,7 +2830,7 @@ "type": "tidelift" } ], - "time": "2023-10-03T09:22:33+00:00" + "time": "2024-03-05T22:28:45+00:00" }, { "name": "doctrine/inflector", @@ -3072,28 +3072,28 @@ }, { "name": "doctrine/orm", - "version": "3.0.1", + "version": "3.1.0", "source": { "type": "git", "url": "https://github.com/doctrine/orm.git", - "reference": "2a250b5814de192a23438c0a43e15da7e77890a7" + "reference": "716fc97b70cf8116f74eaa0588eef51420874bf9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/orm/zipball/2a250b5814de192a23438c0a43e15da7e77890a7", - "reference": "2a250b5814de192a23438c0a43e15da7e77890a7", + "url": "https://api.github.com/repos/doctrine/orm/zipball/716fc97b70cf8116f74eaa0588eef51420874bf9", + "reference": "716fc97b70cf8116f74eaa0588eef51420874bf9", "shasum": "" }, "require": { "composer-runtime-api": "^2", - "doctrine/collections": "^2.1", + "doctrine/collections": "^2.2", "doctrine/dbal": "^3.8.2 || ^4", "doctrine/deprecations": "^0.5.3 || ^1", "doctrine/event-manager": "^1.2 || ^2", "doctrine/inflector": "^1.4 || ^2.0", "doctrine/instantiator": "^1.3 || ^2", "doctrine/lexer": "^3", - "doctrine/persistence": "^3.1.1", + "doctrine/persistence": "^3.3.1", "ext-ctype": "*", "php": "^8.1", "psr/cache": "^1 || ^2 || ^3", @@ -3103,12 +3103,12 @@ "require-dev": { "doctrine/coding-standard": "^12.0", "phpbench/phpbench": "^1.0", - "phpstan/phpstan": "1.10.35", + "phpstan/phpstan": "1.10.59", "phpunit/phpunit": "^10.4.0", "psr/log": "^1 || ^2 || ^3", "squizlabs/php_codesniffer": "3.7.2", "symfony/cache": "^5.4 || ^6.2 || ^7.0", - "vimeo/psalm": "5.16.0" + "vimeo/psalm": "5.22.2" }, "suggest": { "ext-dom": "Provides support for XSD validation for XML mapping files", @@ -3154,22 +3154,22 @@ ], "support": { "issues": "https://github.com/doctrine/orm/issues", - "source": "https://github.com/doctrine/orm/tree/3.0.1" + "source": "https://github.com/doctrine/orm/tree/3.1.0" }, - "time": "2024-02-22T12:23:53+00:00" + "time": "2024-03-03T17:45:20+00:00" }, { "name": "doctrine/persistence", - "version": "3.2.0", + "version": "3.3.1", "source": { "type": "git", "url": "https://github.com/doctrine/persistence.git", - "reference": "63fee8c33bef740db6730eb2a750cd3da6495603" + "reference": "b6fd1f126b13c1f7e7321f7338b14a19116b5de4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/doctrine/persistence/zipball/63fee8c33bef740db6730eb2a750cd3da6495603", - "reference": "63fee8c33bef740db6730eb2a750cd3da6495603", + "url": "https://api.github.com/repos/doctrine/persistence/zipball/b6fd1f126b13c1f7e7321f7338b14a19116b5de4", + "reference": "b6fd1f126b13c1f7e7321f7338b14a19116b5de4", "shasum": "" }, "require": { @@ -3238,7 +3238,7 @@ ], "support": { "issues": "https://github.com/doctrine/persistence/issues", - "source": "https://github.com/doctrine/persistence/tree/3.2.0" + "source": "https://github.com/doctrine/persistence/tree/3.3.1" }, "funding": [ { @@ -3254,7 +3254,7 @@ "type": "tidelift" } ], - "time": "2023-05-17T18:32:04+00:00" + "time": "2024-03-01T19:53:13+00:00" }, { "name": "felixfbecker/advanced-json-rpc", @@ -3597,16 +3597,16 @@ }, { "name": "infection/infection", - "version": "0.27.8", + "version": "0.27.10", "source": { "type": "git", "url": "https://github.com/infection/infection.git", - "reference": "673ce762abf3355fcdc186ca17eb89edf86993bf" + "reference": "873cd3335774a114bef9ca93388e623bf362d820" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/infection/infection/zipball/673ce762abf3355fcdc186ca17eb89edf86993bf", - "reference": "673ce762abf3355fcdc186ca17eb89edf86993bf", + "url": "https://api.github.com/repos/infection/infection/zipball/873cd3335774a114bef9ca93388e623bf362d820", + "reference": "873cd3335774a114bef9ca93388e623bf362d820", "shasum": "" }, "require": { @@ -3627,7 +3627,7 @@ "php": "^8.1", "sanmai/later": "^0.1.1", "sanmai/pipeline": "^5.1 || ^6", - "sebastian/diff": "^3.0.2 || ^4.0 || ^5.0", + "sebastian/diff": "^3.0.2 || ^4.0 || ^5.0 || ^6.0", "symfony/console": "^5.4 || ^6.0 || ^7.0", "symfony/filesystem": "^5.4 || ^6.0 || ^7.0", "symfony/finder": "^5.4 || ^6.0 || ^7.0", @@ -3713,7 +3713,7 @@ ], "support": { "issues": "https://github.com/infection/infection/issues", - "source": "https://github.com/infection/infection/tree/0.27.8" + "source": "https://github.com/infection/infection/tree/0.27.10" }, "funding": [ { @@ -3725,7 +3725,7 @@ "type": "open_collective" } ], - "time": "2023-11-08T14:29:03+00:00" + "time": "2024-02-20T00:08:52+00:00" }, { "name": "justinrainbow/json-schema", @@ -4154,20 +4154,21 @@ }, { "name": "phar-io/manifest", - "version": "2.0.3", + "version": "2.0.4", "source": { "type": "git", "url": "https://github.com/phar-io/manifest.git", - "reference": "97803eca37d319dfa7826cc2437fc020857acb53" + "reference": "54750ef60c58e43759730615a392c31c80e23176" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phar-io/manifest/zipball/97803eca37d319dfa7826cc2437fc020857acb53", - "reference": "97803eca37d319dfa7826cc2437fc020857acb53", + "url": "https://api.github.com/repos/phar-io/manifest/zipball/54750ef60c58e43759730615a392c31c80e23176", + "reference": "54750ef60c58e43759730615a392c31c80e23176", "shasum": "" }, "require": { "ext-dom": "*", + "ext-libxml": "*", "ext-phar": "*", "ext-xmlwriter": "*", "phar-io/version": "^3.0.1", @@ -4208,9 +4209,15 @@ "description": "Component for reading phar.io manifest information from a PHP Archive (PHAR)", "support": { "issues": "https://github.com/phar-io/manifest/issues", - "source": "https://github.com/phar-io/manifest/tree/2.0.3" + "source": "https://github.com/phar-io/manifest/tree/2.0.4" }, - "time": "2021-07-20T11:28:43+00:00" + "funding": [ + { + "url": "https://github.com/theseer", + "type": "github" + } + ], + "time": "2024-03-03T12:33:53+00:00" }, { "name": "phar-io/version", @@ -4633,24 +4640,24 @@ }, { "name": "phpspec/prophecy", - "version": "v1.18.0", + "version": "v1.19.0", "source": { "type": "git", "url": "https://github.com/phpspec/prophecy.git", - "reference": "d4f454f7e1193933f04e6500de3e79191648ed0c" + "reference": "67a759e7d8746d501c41536ba40cd9c0a07d6a87" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpspec/prophecy/zipball/d4f454f7e1193933f04e6500de3e79191648ed0c", - "reference": "d4f454f7e1193933f04e6500de3e79191648ed0c", + "url": "https://api.github.com/repos/phpspec/prophecy/zipball/67a759e7d8746d501c41536ba40cd9c0a07d6a87", + "reference": "67a759e7d8746d501c41536ba40cd9c0a07d6a87", "shasum": "" }, "require": { "doctrine/instantiator": "^1.2 || ^2.0", "php": "^7.2 || 8.0.* || 8.1.* || 8.2.* || 8.3.*", "phpdocumentor/reflection-docblock": "^5.2", - "sebastian/comparator": "^3.0 || ^4.0 || ^5.0", - "sebastian/recursion-context": "^3.0 || ^4.0 || ^5.0" + "sebastian/comparator": "^3.0 || ^4.0 || ^5.0 || ^6.0", + "sebastian/recursion-context": "^3.0 || ^4.0 || ^5.0 || ^6.0" }, "require-dev": { "phpspec/phpspec": "^6.0 || ^7.0", @@ -4696,33 +4703,33 @@ ], "support": { "issues": "https://github.com/phpspec/prophecy/issues", - "source": "https://github.com/phpspec/prophecy/tree/v1.18.0" + "source": "https://github.com/phpspec/prophecy/tree/v1.19.0" }, - "time": "2023-12-07T16:22:33+00:00" + "time": "2024-02-29T11:52:51+00:00" }, { "name": "phpspec/prophecy-phpunit", - "version": "v2.1.0", + "version": "v2.2.0", "source": { "type": "git", "url": "https://github.com/phpspec/prophecy-phpunit.git", - "reference": "29f8114c2c319a4308e6b070902211e062efa392" + "reference": "16e1247e139434bce0bac09848bc5c8d882940fc" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpspec/prophecy-phpunit/zipball/29f8114c2c319a4308e6b070902211e062efa392", - "reference": "29f8114c2c319a4308e6b070902211e062efa392", + "url": "https://api.github.com/repos/phpspec/prophecy-phpunit/zipball/16e1247e139434bce0bac09848bc5c8d882940fc", + "reference": "16e1247e139434bce0bac09848bc5c8d882940fc", "shasum": "" }, "require": { "php": "^7.3 || ^8", "phpspec/prophecy": "^1.18", - "phpunit/phpunit": "^9.1 || ^10.1" + "phpunit/phpunit": "^9.1 || ^10.1 || ^11.0" }, "type": "library", "extra": { "branch-alias": { - "dev-master": "2.0-dev" + "dev-master": "2.x-dev" } }, "autoload": { @@ -4748,9 +4755,9 @@ ], "support": { "issues": "https://github.com/phpspec/prophecy-phpunit/issues", - "source": "https://github.com/phpspec/prophecy-phpunit/tree/v2.1.0" + "source": "https://github.com/phpspec/prophecy-phpunit/tree/v2.2.0" }, - "time": "2023-12-08T12:48:02+00:00" + "time": "2024-03-01T08:33:58+00:00" }, { "name": "phpstan/phpdoc-parser", @@ -4801,16 +4808,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.10.59", + "version": "1.10.60", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "e607609388d3a6d418a50a49f7940e8086798281" + "reference": "95dcea7d6c628a3f2f56d091d8a0219485a86bbe" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e607609388d3a6d418a50a49f7940e8086798281", - "reference": "e607609388d3a6d418a50a49f7940e8086798281", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/95dcea7d6c628a3f2f56d091d8a0219485a86bbe", + "reference": "95dcea7d6c628a3f2f56d091d8a0219485a86bbe", "shasum": "" }, "require": { @@ -4859,20 +4866,20 @@ "type": "tidelift" } ], - "time": "2024-02-20T13:59:13+00:00" + "time": "2024-03-07T13:30:19+00:00" }, { "name": "phpunit/php-code-coverage", - "version": "10.1.11", + "version": "10.1.13", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-code-coverage.git", - "reference": "78c3b7625965c2513ee96569a4dbb62601784145" + "reference": "d51c3aec14896d5e80b354fad58e998d1980f8f8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/78c3b7625965c2513ee96569a4dbb62601784145", - "reference": "78c3b7625965c2513ee96569a4dbb62601784145", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/d51c3aec14896d5e80b354fad58e998d1980f8f8", + "reference": "d51c3aec14896d5e80b354fad58e998d1980f8f8", "shasum": "" }, "require": { @@ -4929,7 +4936,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/php-code-coverage/issues", "security": "https://github.com/sebastianbergmann/php-code-coverage/security/policy", - "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/10.1.11" + "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/10.1.13" }, "funding": [ { @@ -4937,7 +4944,7 @@ "type": "github" } ], - "time": "2023-12-21T15:38:30+00:00" + "time": "2024-03-09T16:54:15+00:00" }, { "name": "phpunit/php-file-iterator", @@ -5184,16 +5191,16 @@ }, { "name": "phpunit/phpunit", - "version": "10.5.11", + "version": "10.5.12", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "0d968f6323deb3dbfeba5bfd4929b9415eb7a9a4" + "reference": "41a9886b85ac7bf3929853baf96b95361cd69d2b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/0d968f6323deb3dbfeba5bfd4929b9415eb7a9a4", - "reference": "0d968f6323deb3dbfeba5bfd4929b9415eb7a9a4", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/41a9886b85ac7bf3929853baf96b95361cd69d2b", + "reference": "41a9886b85ac7bf3929853baf96b95361cd69d2b", "shasum": "" }, "require": { @@ -5265,7 +5272,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/10.5.11" + "source": "https://github.com/sebastianbergmann/phpunit/tree/10.5.12" }, "funding": [ { @@ -5281,7 +5288,7 @@ "type": "tidelift" } ], - "time": "2024-02-25T14:05:00+00:00" + "time": "2024-03-09T12:04:07+00:00" }, { "name": "psalm/plugin-phpunit", @@ -5345,28 +5352,28 @@ }, { "name": "roave/infection-static-analysis-plugin", - "version": "1.34.0", + "version": "1.35.0", "source": { "type": "git", "url": "https://github.com/Roave/infection-static-analysis-plugin.git", - "reference": "7b045a2e342ef86d8fc8f2b08f364cb99677a59d" + "reference": "3cb32845c5f758913a4b9eafd91ae18eafc26d82" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/Roave/infection-static-analysis-plugin/zipball/7b045a2e342ef86d8fc8f2b08f364cb99677a59d", - "reference": "7b045a2e342ef86d8fc8f2b08f364cb99677a59d", + "url": "https://api.github.com/repos/Roave/infection-static-analysis-plugin/zipball/3cb32845c5f758913a4b9eafd91ae18eafc26d82", + "reference": "3cb32845c5f758913a4b9eafd91ae18eafc26d82", "shasum": "" }, "require": { "composer-runtime-api": "^2.2", - "infection/infection": "0.27.8", + "infection/infection": "0.27.10", "php": "~8.1.0 || ~8.2.0 || ~8.3.0", "sanmai/later": "^0.1.4", "vimeo/psalm": "^4.30.0 || ^5.15" }, "require-dev": { "doctrine/coding-standard": "^12.0.0", - "phpunit/phpunit": "^10.5.0" + "phpunit/phpunit": "^10.5.12" }, "bin": [ "bin/roave-infection-static-analysis-plugin" @@ -5390,9 +5397,9 @@ "description": "Static analysis on top of mutation testing - prevents escaped mutants from being invalid according to static analysis", "support": { "issues": "https://github.com/Roave/infection-static-analysis-plugin/issues", - "source": "https://github.com/Roave/infection-static-analysis-plugin/tree/1.34.0" + "source": "https://github.com/Roave/infection-static-analysis-plugin/tree/1.35.0" }, - "time": "2023-12-01T11:52:19+00:00" + "time": "2024-03-10T11:55:48+00:00" }, { "name": "sanmai/later", @@ -5525,16 +5532,16 @@ }, { "name": "sebastian/cli-parser", - "version": "2.0.0", + "version": "2.0.1", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/cli-parser.git", - "reference": "efdc130dbbbb8ef0b545a994fd811725c5282cae" + "reference": "c34583b87e7b7a8055bf6c450c2c77ce32a24084" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/cli-parser/zipball/efdc130dbbbb8ef0b545a994fd811725c5282cae", - "reference": "efdc130dbbbb8ef0b545a994fd811725c5282cae", + "url": "https://api.github.com/repos/sebastianbergmann/cli-parser/zipball/c34583b87e7b7a8055bf6c450c2c77ce32a24084", + "reference": "c34583b87e7b7a8055bf6c450c2c77ce32a24084", "shasum": "" }, "require": { @@ -5569,7 +5576,8 @@ "homepage": "https://github.com/sebastianbergmann/cli-parser", "support": { "issues": "https://github.com/sebastianbergmann/cli-parser/issues", - "source": "https://github.com/sebastianbergmann/cli-parser/tree/2.0.0" + "security": "https://github.com/sebastianbergmann/cli-parser/security/policy", + "source": "https://github.com/sebastianbergmann/cli-parser/tree/2.0.1" }, "funding": [ { @@ -5577,7 +5585,7 @@ "type": "github" } ], - "time": "2023-02-03T06:58:15+00:00" + "time": "2024-03-02T07:12:49+00:00" }, { "name": "sebastian/code-unit", @@ -5827,16 +5835,16 @@ }, { "name": "sebastian/diff", - "version": "5.1.0", + "version": "5.1.1", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/diff.git", - "reference": "fbf413a49e54f6b9b17e12d900ac7f6101591b7f" + "reference": "c41e007b4b62af48218231d6c2275e4c9b975b2e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/fbf413a49e54f6b9b17e12d900ac7f6101591b7f", - "reference": "fbf413a49e54f6b9b17e12d900ac7f6101591b7f", + "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/c41e007b4b62af48218231d6c2275e4c9b975b2e", + "reference": "c41e007b4b62af48218231d6c2275e4c9b975b2e", "shasum": "" }, "require": { @@ -5844,7 +5852,7 @@ }, "require-dev": { "phpunit/phpunit": "^10.0", - "symfony/process": "^4.2 || ^5" + "symfony/process": "^6.4" }, "type": "library", "extra": { @@ -5882,7 +5890,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/diff/issues", "security": "https://github.com/sebastianbergmann/diff/security/policy", - "source": "https://github.com/sebastianbergmann/diff/tree/5.1.0" + "source": "https://github.com/sebastianbergmann/diff/tree/5.1.1" }, "funding": [ { @@ -5890,7 +5898,7 @@ "type": "github" } ], - "time": "2023-12-22T10:55:06+00:00" + "time": "2024-03-02T07:15:17+00:00" }, { "name": "sebastian/environment", @@ -5958,16 +5966,16 @@ }, { "name": "sebastian/exporter", - "version": "5.1.1", + "version": "5.1.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/exporter.git", - "reference": "64f51654862e0f5e318db7e9dcc2292c63cdbddc" + "reference": "955288482d97c19a372d3f31006ab3f37da47adf" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/64f51654862e0f5e318db7e9dcc2292c63cdbddc", - "reference": "64f51654862e0f5e318db7e9dcc2292c63cdbddc", + "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/955288482d97c19a372d3f31006ab3f37da47adf", + "reference": "955288482d97c19a372d3f31006ab3f37da47adf", "shasum": "" }, "require": { @@ -6024,7 +6032,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/exporter/issues", "security": "https://github.com/sebastianbergmann/exporter/security/policy", - "source": "https://github.com/sebastianbergmann/exporter/tree/5.1.1" + "source": "https://github.com/sebastianbergmann/exporter/tree/5.1.2" }, "funding": [ { @@ -6032,20 +6040,20 @@ "type": "github" } ], - "time": "2023-09-24T13:22:09+00:00" + "time": "2024-03-02T07:17:12+00:00" }, { "name": "sebastian/global-state", - "version": "6.0.1", + "version": "6.0.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/global-state.git", - "reference": "7ea9ead78f6d380d2a667864c132c2f7b83055e4" + "reference": "987bafff24ecc4c9ac418cab1145b96dd6e9cbd9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/7ea9ead78f6d380d2a667864c132c2f7b83055e4", - "reference": "7ea9ead78f6d380d2a667864c132c2f7b83055e4", + "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/987bafff24ecc4c9ac418cab1145b96dd6e9cbd9", + "reference": "987bafff24ecc4c9ac418cab1145b96dd6e9cbd9", "shasum": "" }, "require": { @@ -6079,14 +6087,14 @@ } ], "description": "Snapshotting of global state", - "homepage": "http://www.github.com/sebastianbergmann/global-state", + "homepage": "https://www.github.com/sebastianbergmann/global-state", "keywords": [ "global state" ], "support": { "issues": "https://github.com/sebastianbergmann/global-state/issues", "security": "https://github.com/sebastianbergmann/global-state/security/policy", - "source": "https://github.com/sebastianbergmann/global-state/tree/6.0.1" + "source": "https://github.com/sebastianbergmann/global-state/tree/6.0.2" }, "funding": [ { @@ -6094,7 +6102,7 @@ "type": "github" } ], - "time": "2023-07-19T07:19:23+00:00" + "time": "2024-03-02T07:19:19+00:00" }, { "name": "sebastian/lines-of-code", @@ -6504,32 +6512,32 @@ }, { "name": "slevomat/coding-standard", - "version": "8.14.1", + "version": "8.15.0", "source": { "type": "git", "url": "https://github.com/slevomat/coding-standard.git", - "reference": "fea1fd6f137cc84f9cba0ae30d549615dbc6a926" + "reference": "7d1d957421618a3803b593ec31ace470177d7817" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/slevomat/coding-standard/zipball/fea1fd6f137cc84f9cba0ae30d549615dbc6a926", - "reference": "fea1fd6f137cc84f9cba0ae30d549615dbc6a926", + "url": "https://api.github.com/repos/slevomat/coding-standard/zipball/7d1d957421618a3803b593ec31ace470177d7817", + "reference": "7d1d957421618a3803b593ec31ace470177d7817", "shasum": "" }, "require": { "dealerdirect/phpcodesniffer-composer-installer": "^0.6.2 || ^0.7 || ^1.0", "php": "^7.2 || ^8.0", "phpstan/phpdoc-parser": "^1.23.1", - "squizlabs/php_codesniffer": "^3.7.1" + "squizlabs/php_codesniffer": "^3.9.0" }, "require-dev": { "phing/phing": "2.17.4", "php-parallel-lint/php-parallel-lint": "1.3.2", - "phpstan/phpstan": "1.10.37", + "phpstan/phpstan": "1.10.60", "phpstan/phpstan-deprecation-rules": "1.1.4", - "phpstan/phpstan-phpunit": "1.3.14", - "phpstan/phpstan-strict-rules": "1.5.1", - "phpunit/phpunit": "8.5.21|9.6.8|10.3.5" + "phpstan/phpstan-phpunit": "1.3.16", + "phpstan/phpstan-strict-rules": "1.5.2", + "phpunit/phpunit": "8.5.21|9.6.8|10.5.11" }, "type": "phpcodesniffer-standard", "extra": { @@ -6553,7 +6561,7 @@ ], "support": { "issues": "https://github.com/slevomat/coding-standard/issues", - "source": "https://github.com/slevomat/coding-standard/tree/8.14.1" + "source": "https://github.com/slevomat/coding-standard/tree/8.15.0" }, "funding": [ { @@ -6565,7 +6573,7 @@ "type": "tidelift" } ], - "time": "2023-10-08T07:28:08+00:00" + "time": "2024-03-09T15:20:58+00:00" }, { "name": "spatie/array-to-xml", @@ -6712,16 +6720,16 @@ }, { "name": "symfony/clock", - "version": "v7.0.3", + "version": "v7.0.5", "source": { "type": "git", "url": "https://github.com/symfony/clock.git", - "reference": "1c680e565dc0044d8ed3baeb57835fcacd9c6aed" + "reference": "8b9d08887353d627d5f6c3bf3373b398b49051c2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/clock/zipball/1c680e565dc0044d8ed3baeb57835fcacd9c6aed", - "reference": "1c680e565dc0044d8ed3baeb57835fcacd9c6aed", + "url": "https://api.github.com/repos/symfony/clock/zipball/8b9d08887353d627d5f6c3bf3373b398b49051c2", + "reference": "8b9d08887353d627d5f6c3bf3373b398b49051c2", "shasum": "" }, "require": { @@ -6766,7 +6774,7 @@ "time" ], "support": { - "source": "https://github.com/symfony/clock/tree/v7.0.3" + "source": "https://github.com/symfony/clock/tree/v7.0.5" }, "funding": [ { @@ -6782,7 +6790,7 @@ "type": "tidelift" } ], - "time": "2024-01-23T15:02:46+00:00" + "time": "2024-03-02T12:46:12+00:00" }, { "name": "symfony/deprecation-contracts", @@ -6916,16 +6924,16 @@ }, { "name": "symfony/messenger", - "version": "v7.0.3", + "version": "v7.0.4", "source": { "type": "git", "url": "https://github.com/symfony/messenger.git", - "reference": "6271373f5dd4cc1750eccb73839d20797bd66072" + "reference": "804a8997f93313a8f7ed19e8cca3b44fdd18bdec" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/messenger/zipball/6271373f5dd4cc1750eccb73839d20797bd66072", - "reference": "6271373f5dd4cc1750eccb73839d20797bd66072", + "url": "https://api.github.com/repos/symfony/messenger/zipball/804a8997f93313a8f7ed19e8cca3b44fdd18bdec", + "reference": "804a8997f93313a8f7ed19e8cca3b44fdd18bdec", "shasum": "" }, "require": { @@ -6982,7 +6990,7 @@ "description": "Helps applications send and receive messages to/from other applications or via message queues", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/messenger/tree/v7.0.3" + "source": "https://github.com/symfony/messenger/tree/v7.0.4" }, "funding": [ { @@ -6998,7 +7006,7 @@ "type": "tidelift" } ], - "time": "2024-01-30T13:55:15+00:00" + "time": "2024-02-26T07:52:39+00:00" }, { "name": "symfony/options-resolver", @@ -7226,16 +7234,16 @@ }, { "name": "symfony/process", - "version": "v7.0.3", + "version": "v7.0.4", "source": { "type": "git", "url": "https://github.com/symfony/process.git", - "reference": "937a195147e0c27b2759ade834169ed006d0bc74" + "reference": "0e7727191c3b71ebec6d529fa0e50a01ca5679e9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/process/zipball/937a195147e0c27b2759ade834169ed006d0bc74", - "reference": "937a195147e0c27b2759ade834169ed006d0bc74", + "url": "https://api.github.com/repos/symfony/process/zipball/0e7727191c3b71ebec6d529fa0e50a01ca5679e9", + "reference": "0e7727191c3b71ebec6d529fa0e50a01ca5679e9", "shasum": "" }, "require": { @@ -7267,7 +7275,7 @@ "description": "Executes commands in sub-processes", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/process/tree/v7.0.3" + "source": "https://github.com/symfony/process/tree/v7.0.4" }, "funding": [ { @@ -7283,20 +7291,20 @@ "type": "tidelift" } ], - "time": "2024-01-23T15:02:46+00:00" + "time": "2024-02-22T20:27:20+00:00" }, { "name": "symfony/var-dumper", - "version": "v7.0.3", + "version": "v7.0.4", "source": { "type": "git", "url": "https://github.com/symfony/var-dumper.git", - "reference": "a7a061abbf6fe3d4a79032cbc5149a4d65a10234" + "reference": "e03ad7c1535e623edbb94c22cc42353e488c6670" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/var-dumper/zipball/a7a061abbf6fe3d4a79032cbc5149a4d65a10234", - "reference": "a7a061abbf6fe3d4a79032cbc5149a4d65a10234", + "url": "https://api.github.com/repos/symfony/var-dumper/zipball/e03ad7c1535e623edbb94c22cc42353e488c6670", + "reference": "e03ad7c1535e623edbb94c22cc42353e488c6670", "shasum": "" }, "require": { @@ -7350,7 +7358,7 @@ "dump" ], "support": { - "source": "https://github.com/symfony/var-dumper/tree/v7.0.3" + "source": "https://github.com/symfony/var-dumper/tree/v7.0.4" }, "funding": [ { @@ -7366,7 +7374,7 @@ "type": "tidelift" } ], - "time": "2024-01-23T15:02:46+00:00" + "time": "2024-02-15T11:33:06+00:00" }, { "name": "thecodingmachine/safe", @@ -7509,16 +7517,16 @@ }, { "name": "theseer/tokenizer", - "version": "1.2.2", + "version": "1.2.3", "source": { "type": "git", "url": "https://github.com/theseer/tokenizer.git", - "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96" + "reference": "737eda637ed5e28c3413cb1ebe8bb52cbf1ca7a2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/theseer/tokenizer/zipball/b2ad5003ca10d4ee50a12da31de12a5774ba6b96", - "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96", + "url": "https://api.github.com/repos/theseer/tokenizer/zipball/737eda637ed5e28c3413cb1ebe8bb52cbf1ca7a2", + "reference": "737eda637ed5e28c3413cb1ebe8bb52cbf1ca7a2", "shasum": "" }, "require": { @@ -7547,7 +7555,7 @@ "description": "A small library for converting tokenized PHP source code into XML and potentially other formats", "support": { "issues": "https://github.com/theseer/tokenizer/issues", - "source": "https://github.com/theseer/tokenizer/tree/1.2.2" + "source": "https://github.com/theseer/tokenizer/tree/1.2.3" }, "funding": [ { @@ -7555,20 +7563,20 @@ "type": "github" } ], - "time": "2023-11-20T00:12:19+00:00" + "time": "2024-03-03T12:36:25+00:00" }, { "name": "vimeo/psalm", - "version": "5.22.2", + "version": "5.23.1", "source": { "type": "git", "url": "https://github.com/vimeo/psalm.git", - "reference": "d768d914152dbbf3486c36398802f74e80cfde48" + "reference": "8471a896ccea3526b26d082f4461eeea467f10a4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/vimeo/psalm/zipball/d768d914152dbbf3486c36398802f74e80cfde48", - "reference": "d768d914152dbbf3486c36398802f74e80cfde48", + "url": "https://api.github.com/repos/vimeo/psalm/zipball/8471a896ccea3526b26d082f4461eeea467f10a4", + "reference": "8471a896ccea3526b26d082f4461eeea467f10a4", "shasum": "" }, "require": { @@ -7665,7 +7673,7 @@ "issues": "https://github.com/vimeo/psalm/issues", "source": "https://github.com/vimeo/psalm" }, - "time": "2024-02-22T23:39:07+00:00" + "time": "2024-03-11T20:33:46+00:00" }, { "name": "webmozart/assert", @@ -7727,16 +7735,16 @@ }, { "name": "webmozart/glob", - "version": "4.6.0", + "version": "4.7.0", "source": { "type": "git", "url": "https://github.com/webmozarts/glob.git", - "reference": "3c17f7dec3d9d0e87b575026011f2e75a56ed655" + "reference": "8a2842112d6916e61e0e15e316465b611f3abc17" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/webmozarts/glob/zipball/3c17f7dec3d9d0e87b575026011f2e75a56ed655", - "reference": "3c17f7dec3d9d0e87b575026011f2e75a56ed655", + "url": "https://api.github.com/repos/webmozarts/glob/zipball/8a2842112d6916e61e0e15e316465b611f3abc17", + "reference": "8a2842112d6916e61e0e15e316465b611f3abc17", "shasum": "" }, "require": { @@ -7770,9 +7778,9 @@ "description": "A PHP implementation of Ant's glob.", "support": { "issues": "https://github.com/webmozarts/glob/issues", - "source": "https://github.com/webmozarts/glob/tree/4.6.0" + "source": "https://github.com/webmozarts/glob/tree/4.7.0" }, - "time": "2022-05-24T19:45:58+00:00" + "time": "2024-03-07T20:33:40+00:00" } ], "aliases": [], diff --git a/src/Console/Command/SubscriptionBootCommand.php b/src/Console/Command/SubscriptionBootCommand.php index e63f864eb..57f087185 100644 --- a/src/Console/Command/SubscriptionBootCommand.php +++ b/src/Console/Command/SubscriptionBootCommand.php @@ -4,15 +4,19 @@ namespace Patchlevel\EventSourcing\Console\Command; +use Closure; use Patchlevel\EventSourcing\Console\InputHelper; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; +use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Logger\ConsoleLogger; use Symfony\Component\Console\Output\OutputInterface; #[AsCommand( 'event-sourcing:subscription:boot', - 'Prepare new subscriptions and catch up with the event store', + 'Catch up with the event store.', )] final class SubscriptionBootCommand extends SubscriptionCommand { @@ -22,20 +26,97 @@ public function configure(): void $this ->addOption( - 'limit', + 'run-limit', + null, + InputOption::VALUE_REQUIRED, + 'The maximum number of runs this command should execute', + ) + ->addOption( + 'message-limit', null, InputOption::VALUE_REQUIRED, 'How many messages should be consumed in one run', + 1000, + ) + ->addOption( + 'memory-limit', + null, + InputOption::VALUE_REQUIRED, + 'How much memory consumption should the worker be terminated (e.g. 250MB)', + ) + ->addOption( + 'time-limit', + null, + InputOption::VALUE_REQUIRED, + 'What is the maximum time the worker can run in seconds', + ) + ->addOption( + 'sleep', + null, + InputOption::VALUE_REQUIRED, + 'How much time should elapse before the next job is executed in milliseconds', + ) + ->addOption( + 'setup', + null, + InputOption::VALUE_NONE, + 'Setup new subscriptions', ); } protected function execute(InputInterface $input, OutputInterface $output): int { - $limit = InputHelper::nullablePositiveInt($input->getOption('limit')); + $runLimit = InputHelper::nullablePositiveInt($input->getOption('run-limit')); + $messageLimit = InputHelper::nullablePositiveInt($input->getOption('message-limit')); + $memoryLimit = InputHelper::nullableString($input->getOption('memory-limit')); + $timeLimit = InputHelper::nullablePositiveInt($input->getOption('time-limit')); + $sleep = InputHelper::positiveIntOrZero($input->getOption('sleep')); + $setup = InputHelper::bool($input->getOption('setup')); $criteria = $this->subscriptionEngineCriteria($input); - $this->engine->boot($criteria, $limit); - return 0; + if ($setup) { + $this->engine->setup($criteria); + } + + $logger = new ConsoleLogger($output); + + $finished = false; + + $worker = DefaultWorker::create( + function (Closure $stop) use ($criteria, $messageLimit, &$finished): void { + $this->engine->boot($criteria, $messageLimit); + + if (!$this->isBootingFinished($criteria)) { + return; + } + + $finished = true; + $stop(); + }, + [ + 'runLimit' => $runLimit, + 'memoryLimit' => $memoryLimit, + 'timeLimit' => $timeLimit, + ], + $logger, + ); + + $worker->run($sleep); + + return $finished ? 0 : 1; + } + + private function isBootingFinished(SubscriptionEngineCriteria $criteria): bool + { + $subscriptions = $this->engine->subscriptions($criteria); + + foreach ($subscriptions as $subscription) { + if ($subscription->isBooting()) { + return false; + } + } + + return true; } } diff --git a/src/Console/Command/SubscriptionSetupCommand.php b/src/Console/Command/SubscriptionSetupCommand.php new file mode 100644 index 000000000..1c9f74897 --- /dev/null +++ b/src/Console/Command/SubscriptionSetupCommand.php @@ -0,0 +1,24 @@ +subscriptionEngineCriteria($input); + $this->engine->setup($criteria); + + return 0; + } +} diff --git a/src/Store/DoctrineDbalStore.php b/src/Store/DoctrineDbalStore.php index 6e49134b7..c22c80f14 100644 --- a/src/Store/DoctrineDbalStore.php +++ b/src/Store/DoctrineDbalStore.php @@ -323,7 +323,7 @@ private function getCustomHeaders(Message $message): array return array_values( array_filter( $message->headers(), - static fn (object $header) => !in_array($header::class, $filteredHeaders, true) + static fn (object $header) => !in_array($header::class, $filteredHeaders, true), ), ); } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 13f66a91e..820121a8c 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -36,6 +36,94 @@ public function __construct( ) { } + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void + { + $criteria ??= new SubscriptionEngineCriteria(); + + $this->logger?->info( + 'Subscription Engine: Start to setup.', + ); + + $this->discoverNewSubscriptions(); + $this->retrySubscriptions($criteria); + + $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::New], + ), + function (array $subscriptions) use ($skipBooting): void { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.'); + + return; + } + + $latestIndex = $this->latestIndex(); + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriber($subscription->id()); + + if (!$subscriber) { + throw SubscriberNotFound::forSubscriptionId($subscription->id()); + } + + $setupMethod = $subscriber->setupMethod(); + + if (!$setupMethod) { + if ($subscription->runMode() === RunMode::FromNow) { + $subscription->changePosition($latestIndex); + $subscription->active(); + } else { + $skipBooting ? $subscription->active() : $subscription->booting(); + } + + $this->subscriptionStore->update($subscription); + + $this->logger?->debug(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has no setup method, set to %s.', + $subscriber::class, + $subscription->id(), + $subscription->runMode() === RunMode::FromNow || $skipBooting ? 'active' : 'booting', + )); + + continue; + } + + try { + $setupMethod(); + + if ($subscription->runMode() === RunMode::FromNow) { + $subscription->changePosition($latestIndex); + $subscription->active(); + } else { + $skipBooting ? $subscription->active() : $subscription->booting(); + } + + $this->subscriptionStore->update($subscription); + + $this->logger?->debug(sprintf( + 'Subscription Engine: For Subscriber "%s" for "%s" the setup method has been executed, set to %s.', + $subscriber::class, + $subscription->id(), + $subscription->runMode() === RunMode::FromNow || $skipBooting ? 'active' : 'booting', + )); + } catch (Throwable $e) { + $this->logger?->error(sprintf( + 'Subscription Engine: Subscriber "%s" for "%s" has an error in the setup method: %s', + $subscriber::class, + $subscription->id(), + $e->getMessage(), + )); + + $this->handleError($subscription, $e); + } + } + }, + ); + } + public function boot( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, @@ -48,7 +136,6 @@ public function boot( $this->discoverNewSubscriptions(); $this->retrySubscriptions($criteria); - $this->setupNewSubscriptions($criteria); $this->findForUpdate( new SubscriptionCriteria( @@ -57,8 +144,6 @@ public function boot( status: [Status::Booting], ), function ($subscriptions) use ($limit): void { - $subscriptions = $this->fastForwardFromNowSubscriptions($subscriptions); - if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); @@ -131,6 +216,8 @@ function ($subscriptions) use ($limit): void { } } } finally { + $stream?->close(); + if ($messageCounter > 0) { foreach ($subscriptions as $subscription) { if (!$subscription->isBooting()) { @@ -140,8 +227,6 @@ function ($subscriptions) use ($limit): void { $this->subscriptionStore->update($subscription); } } - - $stream?->close(); } $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); @@ -262,6 +347,8 @@ function (array $subscriptions) use ($limit): void { } } } finally { + $stream?->close(); + if ($messageCounter > 0) { foreach ($subscriptions as $subscription) { if (!$subscription->isActive()) { @@ -271,8 +358,24 @@ function (array $subscriptions) use ($limit): void { $this->subscriptionStore->update($subscription); } } + } - $stream?->close(); + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } + + if ($subscription->runMode() !== RunMode::Once) { + continue; + } + + $subscription->finished(); + $this->subscriptionStore->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); } $this->logger?->info( @@ -684,108 +787,6 @@ function (array $subscriptions): void { ); } - /** - * @param list $subscriptions - * - * @return list - */ - private function fastForwardFromNowSubscriptions(array $subscriptions): array - { - $latestIndex = null; - $forwardedSubscriptions = []; - - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); - - if (!$subscriber) { - $forwardedSubscriptions[] = $subscription; - - continue; - } - - if ($subscription->runMode() === RunMode::FromBeginning || $subscription->runMode() === RunMode::Once) { - $forwardedSubscriptions[] = $subscription; - - continue; - } - - if ($latestIndex === null) { - $latestIndex = $this->latestIndex(); - } - - $subscription->changePosition($latestIndex); - $subscription->active(); - $this->subscriptionStore->update($subscription); - - $this->logger?->info( - sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" is in "from now" mode: skip past messages and set to active.', - $subscriber::class, - $subscription->id(), - ), - ); - } - - return $forwardedSubscriptions; - } - - private function setupNewSubscriptions(SubscriptionEngineCriteria $criteria): void - { - $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::New], - ), - function (array $subscriptions): void { - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); - - if (!$subscriber) { - throw SubscriberNotFound::forSubscriptionId($subscription->id()); - } - - $setupMethod = $subscriber->setupMethod(); - - if (!$setupMethod) { - $subscription->booting(); - $this->subscriptionStore->update($subscription); - - $this->logger?->debug(sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" has no setup method, continue.', - $subscriber::class, - $subscription->id(), - )); - - continue; - } - - try { - $setupMethod(); - - $subscription->booting(); - $this->subscriptionStore->update($subscription); - - $this->logger?->debug(sprintf( - 'Subscription Engine: For Subscriber "%s" for "%s" the setup method has been executed and is now prepared for data.', - $subscriber::class, - $subscription->id(), - )); - } catch (Throwable $e) { - $this->logger?->error(sprintf( - 'Subscription Engine: Subscriber "%s" for "%s" has an error in the setup method: %s', - $subscriber::class, - $subscription->id(), - $e->getMessage(), - )); - - $this->handleError($subscription, $e); - } - } - }, - ); - } - private function discoverNewSubscriptions(): void { $this->findForUpdate( diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index 1b3c56a18..0e4229eaa 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -8,6 +8,8 @@ interface SubscriptionEngine { + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void; + /** * @param positive-int|null $limit * diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index ce6b09f57..dae8be233 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -96,6 +96,7 @@ public function setUp(): void #[Bench\Revs(10)] public function benchHandle10000Events(): void { + $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); $this->subscriptionEngine->remove(); } diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index 9b9bd3c40..8db1b0c30 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -16,7 +16,6 @@ use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; -use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Tests\DbalManager; @@ -83,7 +82,8 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $engine->boot(new SubscriptionEngineCriteria()); + $engine->setup(); + $engine->boot(); $bankAccountId = AccountId::fromString('1'); $bankAccount = BankAccount::create($bankAccountId, 'John'); diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index c1c0beaa5..e8db9686c 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -80,6 +80,7 @@ public function testSuccessful(): void ); $schemaDirector->create(); + $engine->setup(); $engine->boot(); $profileId = ProfileId::fromString('1'); @@ -148,6 +149,7 @@ public function testSnapshot(): void ); $schemaDirector->create(); + $engine->setup(); $engine->boot(); $profileId = ProfileId::fromString('1'); diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index e813964b6..d0083f6d4 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -109,6 +109,7 @@ public function testHappyPath(): void $engine->subscriptions(), ); + $engine->setup(); $engine->boot(); self::assertEquals( @@ -222,6 +223,7 @@ public function testErrorHandling(): void ), ); + $engine->setup(); $engine->boot(); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -371,6 +373,7 @@ public function testProcessor(): void $engine->subscriptions(), ); + $engine->setup(); $engine->boot(); self::assertEquals( diff --git a/tests/Unit/Repository/DefaultRepositoryTest.php b/tests/Unit/Repository/DefaultRepositoryTest.php index 8d2e055dd..77c4f92cd 100644 --- a/tests/Unit/Repository/DefaultRepositoryTest.php +++ b/tests/Unit/Repository/DefaultRepositoryTest.php @@ -73,7 +73,7 @@ public function testSaveAggregate(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -151,7 +151,7 @@ public function testUpdateAggregate(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -224,7 +224,7 @@ public function testDecorator(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -310,7 +310,7 @@ public function testSaveAggregateWithEmptyEventStream(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -352,7 +352,7 @@ public function testDetachedException(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -421,7 +421,7 @@ public function testDuplicate(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -461,7 +461,7 @@ public function testOutdated(): void $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); @@ -527,7 +527,7 @@ public function testSaveAggregateWithSplitStream(): void $store->archiveMessages('profile', '1', 3)->shouldBeCalledOnce(); $store->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), ); $eventBus = $this->prophesize(EventBus::class); diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index 737f49f5d..7ce539fae 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -240,7 +240,7 @@ public function testSaveWithOneEvent(): void $mockedConnection = $this->prophesize(Connection::class); $mockedConnection->transactional(Argument::any())->will( /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), ); $singleTableStore = new DoctrineDbalStore( diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index bf2b36ad2..c0a168596 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -40,7 +40,7 @@ final class DefaultSubscriptionEngineTest extends TestCase { use ProphecyTrait; - public function testNothingToBoot(): void + public function testNothingToSetup(): void { $streamableStore = $this->prophesize(Store::class); $streamableStore->load($this->criteria())->shouldNotBeCalled(); @@ -53,21 +53,23 @@ public function testNothingToBoot(): void new MetadataSubscriberAccessorRepository([]), ); - $engine->boot(); + $engine->setup(); self::assertEquals([], $store->addedSubscriptions); self::assertEquals([], $store->updatedSubscriptions); } - public function testBootDiscoverNewSubscribers(): void + public function testSetupWithoutCreateMethod(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] class { }; + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); $subscriptionStore = new DummySubscriptionStore(); @@ -77,7 +79,7 @@ class { new MetadataSubscriberAccessorRepository([$subscriber]), ); - $engine->boot(); + $engine->setup(); self::assertEquals([ new Subscription( @@ -95,30 +97,84 @@ class { RunMode::FromBeginning, Status::Booting, ), + ], $subscriptionStore->updatedSubscriptions); + } + + public function testSetupWithCreateMethod(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public bool $created = false; + + #[Setup] + public function create(): void + { + $this->created = true; + } + }; + + $subscriptionStore = new DummySubscriptionStore(); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->setup(); + + self::assertEquals([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, - Status::Active, + Status::New, + ), + ], $subscriptionStore->addedSubscriptions); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, ), ], $subscriptionStore->updatedSubscriptions); + + self::assertTrue($subscriber->created); } - public function testBootWithoutCreateMethod(): void + public function testSetupWithCreateError(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] class { + public function __construct( + public readonly RuntimeException $exception = new RuntimeException('ERROR'), + ) { + } + + #[Setup] + public function create(): void + { + throw $this->exception; + } }; $subscriptionStore = new DummySubscriptionStore([ new Subscription($subscriptionId), ]); - $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); $engine = new DefaultSubscriptionEngine( $streamableStore->reveal(), @@ -126,59 +182,174 @@ class { new MetadataSubscriberAccessorRepository([$subscriber]), ); - $engine->boot(); + $engine->setup(); - self::assertEquals([ + self::assertEquals( + [ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Error, + 0, + new SubscriptionError( + 'ERROR', + Status::New, + ThrowableToErrorContextTransformer::transform($subscriber->exception), + ), + ), + ], + $subscriptionStore->updatedSubscriptions, + ); + } + + public function testSetupWithSkipBooting(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + }; + + $subscriptionStore = new DummySubscriptionStore([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, - Status::Booting, + Status::New, ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->setup(null, true); + + self::assertEquals([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, - Status::Booting, - 1, + Status::Active, ), + ], $subscriptionStore->updatedSubscriptions); + } + + public function testSetupWithFromNow(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromNow)] + class { + }; + + $subscriptionStore = new DummySubscriptionStore([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, + RunMode::FromNow, + Status::New, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->setup(); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromNow, Status::Active, 1, ), ], $subscriptionStore->updatedSubscriptions); } - public function testBootWithMethods(): void + public function testSetupWithFromNowWithEmtpyStream(): void { $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + $subscriber = new #[Subscriber('test', RunMode::FromNow)] class { - public Message|null $message = null; - public bool $created = false; + }; - #[Setup] - public function create(): void - { - $this->created = true; - } + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromNow, + Status::New, + ), + ]); - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); - $subscriptionStore = new DummySubscriptionStore(); + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); - $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + $engine->setup(); + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromNow, + Status::Active, + 0, + ), + ], $subscriptionStore->updatedSubscriptions); + } + + public function testNothingToBoot(): void + { $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $streamableStore->load($this->criteria())->shouldNotBeCalled(); + + $store = new DummySubscriptionStore(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $store, + new MetadataSubscriberAccessorRepository([]), + ); + + $engine->boot(); + + self::assertEquals([], $store->addedSubscriptions); + self::assertEquals([], $store->updatedSubscriptions); + } + + public function testBootDiscoverNewSubscribers(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + }; + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->shouldNotBeCalled(); + + $subscriptionStore = new DummySubscriptionStore(); $engine = new DefaultSubscriptionEngine( $streamableStore->reveal(), @@ -197,13 +368,48 @@ public function handle(Message $message): void ), ], $subscriptionStore->addedSubscriptions); - self::assertEquals([ + self::assertEquals([], $subscriptionStore->updatedSubscriptions); + } + + public function testBootWithSubscriber(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, RunMode::FromBeginning, Status::Booting, ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->boot(); + + self::assertEquals([], $subscriptionStore->addedSubscriptions); + + self::assertEquals([ new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, @@ -220,7 +426,6 @@ public function handle(Message $message): void ), ], $subscriptionStore->updatedSubscriptions); - self::assertTrue($subscriber->created); self::assertSame($message, $subscriber->message); } @@ -230,13 +435,6 @@ public function testBootWithLimit(): void $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] class { public Message|null $message = null; - public bool $created = false; - - #[Setup] - public function create(): void - { - $this->created = true; - } #[Subscribe(ProfileVisited::class)] public function handle(Message $message): void @@ -245,7 +443,14 @@ public function handle(Message $message): void } }; - $subscriptionStore = new DummySubscriptionStore(); + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); @@ -260,22 +465,9 @@ public function handle(Message $message): void $engine->boot(new SubscriptionEngineCriteria(), 1); - self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::New, - ), - ], $subscriptionStore->addedSubscriptions); + self::assertEquals([], $subscriptionStore->addedSubscriptions); self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::Booting, - ), new Subscription( $subscriptionId, Subscription::DEFAULT_GROUP, @@ -285,7 +477,6 @@ public function handle(Message $message): void ), ], $subscriptionStore->updatedSubscriptions); - self::assertTrue($subscriber->created); self::assertSame($message, $subscriber->message); } @@ -379,57 +570,6 @@ public function handle(Message $message): void self::assertNull($subscriber2->message); } - public function testBootWithCreateError(): void - { - $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] - class { - public function __construct( - public readonly RuntimeException $exception = new RuntimeException('ERROR'), - ) { - } - - #[Setup] - public function create(): void - { - throw $this->exception; - } - }; - - $subscriptionStore = new DummySubscriptionStore([ - new Subscription($subscriptionId), - ]); - - $streamableStore = $this->prophesize(Store::class); - $streamableStore->load($this->criteria())->shouldNotBeCalled(); - - $engine = new DefaultSubscriptionEngine( - $streamableStore->reveal(), - $subscriptionStore, - new MetadataSubscriberAccessorRepository([$subscriber]), - ); - - $engine->boot(); - - self::assertEquals( - [ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromBeginning, - Status::Error, - 0, - new SubscriptionError( - 'ERROR', - Status::New, - ThrowableToErrorContextTransformer::transform($subscriber->exception), - ), - ), - ], - $subscriptionStore->updatedSubscriptions, - ); - } - public function testBootingWithGabInIndex(): void { $subscriptionId = 'test'; @@ -488,104 +628,6 @@ public function handle(Message $message): void self::assertSame([$message1, $message2], $subscriber->messages); } - public function testBootingWithFromNow(): void - { - $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test', RunMode::FromNow)] - class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; - - $subscriptionStore = new DummySubscriptionStore([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Booting, - ), - ]); - - $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); - - $streamableStore = $this->prophesize(Store::class); - $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); - - $engine = new DefaultSubscriptionEngine( - $streamableStore->reveal(), - $subscriptionStore, - new MetadataSubscriberAccessorRepository([$subscriber]), - ); - - $engine->boot(); - - self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Active, - 1, - ), - ], $subscriptionStore->updatedSubscriptions); - - self::assertNull($subscriber->message); - } - - public function testBootingWithFromNowWithEmtpyStream(): void - { - $subscriptionId = 'test'; - $subscriber = new #[Subscriber('test', RunMode::FromNow)] - class { - public Message|null $message = null; - - #[Subscribe(ProfileVisited::class)] - public function handle(Message $message): void - { - $this->message = $message; - } - }; - - $subscriptionStore = new DummySubscriptionStore([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Booting, - ), - ]); - - $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); - - $streamableStore = $this->prophesize(Store::class); - $streamableStore->load(null, 1, null, true)->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); - - $engine = new DefaultSubscriptionEngine( - $streamableStore->reveal(), - $subscriptionStore, - new MetadataSubscriberAccessorRepository([$subscriber]), - ); - - $engine->boot(); - - self::assertEquals([ - new Subscription( - $subscriptionId, - Subscription::DEFAULT_GROUP, - RunMode::FromNow, - Status::Active, - 0, - ), - ], $subscriptionStore->updatedSubscriptions); - - self::assertNull($subscriber->message); - } - public function testBootingWithOnlyOnce(): void { $subscriptionId = 'test'; @@ -1019,6 +1061,62 @@ public function handle(Message $message): void self::assertSame([$message1, $message2], $subscriber->messages); } + public function testRunnningWithOnlyOnce(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::Once)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::Once, + Status::Active, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + ); + + $engine->run(); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::Once, + Status::Active, + 1, + ), + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::Once, + Status::Finished, + 1, + ), + ], $subscriptionStore->updatedSubscriptions); + + self::assertEquals($message1, $subscriber->message); + } + public function testTeardownDiscoverNewSubscribers(): void { $subscriptionId = 'test'; @@ -1794,7 +1892,7 @@ class { $subscriptionStore = $this->prophesize(SubscriptionStore::class); $subscriptionStore->find( Argument::that( - static fn (SubscriptionCriteria $criteria) => $criteria->ids === ['id1'] && $criteria->groups === ['group1'] + static fn (SubscriptionCriteria $criteria) => $criteria->ids === ['id1'] && $criteria->groups === ['group1'], ), )->willReturn([])->shouldBeCalled(); @@ -1831,7 +1929,7 @@ class { $subscriptionStore = $this->prophesize(LockableSubscriptionStore::class); $subscriptionStore->inLock(Argument::type(Closure::class))->will( /** @param array{Closure} $args */ - static fn (array $args): mixed => $args[0]() + static fn (array $args): mixed => $args[0](), )->shouldBeCalled(); $subscriptionStore->find(Argument::any())->willReturn([])->shouldBeCalled(); @@ -1858,6 +1956,7 @@ class { public static function methodProvider(): Generator { + yield 'setup' => ['setup']; yield 'boot' => ['boot']; yield 'run' => ['run']; yield 'teardown' => ['teardown']; From 26be8cfa0c18d0d034d9fc7416e6d2a265dd053f Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 12 Mar 2024 12:31:38 +0100 Subject: [PATCH 2/2] update docs --- docs/pages/cli.md | 2 ++ docs/pages/getting_started.md | 2 +- docs/pages/subscription.md | 32 +++++++++++++++++++++++--------- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/docs/pages/cli.md b/docs/pages/cli.md index 265af177d..34eeda166 100644 --- a/docs/pages/cli.md +++ b/docs/pages/cli.md @@ -36,6 +36,7 @@ To manage your subscriptions there are the following cli commands. * SubscriptionReactiveCommand: `event-sourcing:subscription:reactive` * SubscriptionRemoveCommand: `event-sourcing:subscription:remove` * SubscriptionRunCommand: `event-sourcing:subscription:run` +* SubscriptionSetupCommand: `event-sourcing:subscription:setup` * SubscriptionStatusCommand: `event-sourcing:subscription:status` * SubscriptionTeardownCommand: `event-sourcing:subscription:teardown` @@ -80,6 +81,7 @@ $cli->addCommands(array( new Command\SubscriptionRemoveCommand($projectionist), new Command\SubscriptionReactivateCommand($projectionist), new Command\SubscriptionRebuildCommand($projectionist), + new Command\SubscriptionSetupCommand($projectionist), new Command\SubscriptionStatusCommand($projectionist), new Command\SchemaCreateCommand($store, $schemaManager), new Command\SchemaDropCommand($store, $schemaManager), diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index c886aad16..62534b15a 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -353,7 +353,7 @@ $schemaDirector = new DoctrineSchemaDirector( ); $schemaDirector->create(); -$projectionist->boot(); +$projectionist->setup(skipBooting: true); ``` !!! note diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index b75d7c8e6..f8a994b12 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -285,7 +285,7 @@ final class ProfileSubscriber ### Run Mode -The run mode determines how the subscriber should behave when it is booted. +The run mode determines how the subscriber should behave. There are three different modes: #### From Beginning @@ -382,6 +382,7 @@ stateDiagram-v2 direction LR [*] --> New New --> Booting + New --> Active New --> Error Booting --> Active Booting --> Paused @@ -395,7 +396,6 @@ stateDiagram-v2 Paused --> Booting Paused --> Active Paused --> Outdated - Paused --> [*] Finished --> Active Finished --> Outdated Error --> New @@ -412,12 +412,13 @@ stateDiagram-v2 A subscription is created and "new" if a subscriber exists with an ID that is not yet tracked. This can happen when either a new subscriber has been added, the subscriber ID has changed or the subscription has been manually deleted from the subscription store. +You can then set up the subscription so that it is booting or active. +In this step, the subscription engine also tries to call the `setup` method if available. ### Booting -Booting status is reached when the boot process is invoked. -In this step, the "setup" method is called on the subscription, if available. -And the subscription is brought up to date, depending on the mode. +Booting status is reached when the setup process is finished. +In this step the subscription engine tries to catch up to the current event stream. When the process is finished, the subscription is set to active or finished. ### Active @@ -575,12 +576,25 @@ $criteria = new SubscriptionEngineCriteria( An `OR` check is made for the respective criteria and all criteria are checked with an `AND`. +### Setup + +New subscriptions need to be set up before they can be used. +In this step, the subscription engine also tries to call the `setup` method if available. +After the setup process, the subscription is set to booting or active. + +```php +$subscriptionEngine->setup($criteria); +``` + +!!! tip + + You can skip the booting step with the second boolean parameter named `skipBooting`. + ### Boot -So that the subscription engine can manage the subscriptions, they must be booted. -In this step, the `setup` will be called if available. -Then the subscriptions then catch up with the current position of the event stream. -When the subscriptions are finished, they switch to the active or finished state. +You can boot the subscriptions with the `boot` method. +All booting subscriptions will catch up to the current event stream. +After the boot process, the subscription is set to active or finished. ```php $subscriptionEngine->boot($criteria);