From 16dd8a72737eefe17a47df4ae6cae2cb8deaa00e Mon Sep 17 00:00:00 2001 From: Dominik Pfaffenbauer Date: Mon, 26 Jun 2023 16:04:43 +0200 Subject: [PATCH 1/6] [Messenger] introduce async imports --- .../Command/ImportAsyncCommand.php | 95 +++++++++++++++++++ .../Importer/AsyncImporterInterface.php | 26 +++++ .../Importer/Importer.php | 67 ++++++++++++- .../Messenger/ImportRowMessage.php | 46 +++++++++ .../Messenger/ImportRowMessageHandler.php | 44 +++++++++ .../Resources/config/pimcore/config.yml | 3 + .../Resources/config/pimcore/messenger.yml | 18 ++++ .../Resources/config/services.yml | 8 +- .../Resources/config/services/commands.yml | 8 ++ 9 files changed, 312 insertions(+), 3 deletions(-) create mode 100644 src/DataDefinitionsBundle/Command/ImportAsyncCommand.php create mode 100644 src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php create mode 100644 src/DataDefinitionsBundle/Messenger/ImportRowMessage.php create mode 100644 src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php create mode 100755 src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml diff --git a/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php b/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php new file mode 100644 index 00000000..0f470de5 --- /dev/null +++ b/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php @@ -0,0 +1,95 @@ +eventDispatcher = $eventDispatcher; + $this->repository = $repository; + $this->importer = $importer; + + parent::__construct(); + } + + protected function configure(): void + { + $this + ->setName('data-definitions:async-import') + ->setDescription('Run a Data Definition Import Async.') + ->addOption( + 'definition', + 'd', + InputOption::VALUE_REQUIRED, + 'Import Definition ID or Name' + ) + ->addOption( + 'params', + 'p', + InputOption::VALUE_REQUIRED, + 'JSON Encoded Params' + ); + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $eventDispatcher = $this->eventDispatcher; + + $params = json_decode($input->getOption('params'), true); + + if (!isset($params['userId'])) { + $params['userId'] = 0; + } + + try { + $definition = $this->repository->find($input->getOption('definition')); + } catch (InvalidArgumentException $e) { + $definition = $this->repository->findByName($input->getOption('definition')); + } + + if (!$definition instanceof ImportDefinitionInterface) { + throw new Exception('Import Definition not found'); + } + + $this->importer->doImportAsync($definition, $params); + + return 0; + } +} diff --git a/src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php b/src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php new file mode 100644 index 00000000..8a1cca58 --- /dev/null +++ b/src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php @@ -0,0 +1,26 @@ +getCreateVersion()) { + Version::enable(); + } else { + Version::disable(); + } + + $dataSet = new ArrayImportDataSet($row); + $runner = null; + $runnerContext = $this->contextFactory->createRunnerContext($definition, $params, $row, $dataSet, null); + + if ($definition->getRunner()) { + /** + * @var RunnerInterface $runner + */ + $runner = $this->runnerRegistry->get($definition->getRunner()); + } + + if ($runner instanceof ImportStartFinishRunnerInterface) { + $runner->startImport($runnerContext); + } + + $filter = null; + $filterType = $definition->getFilter(); + if ($filterType) { + /** + * @var FilterInterface $filter + */ + $filter = $this->filterRegistry->get($filterType); + } + + + $object = $this->importRow( + $definition, + $row, + $dataSet, + $params, + $filter, + $runner + ); + } + + public function doImportAsync(ImportDefinitionInterface $definition, array $params): void + { + /** @var ImportDataSetInterface|array $data */ + $data = $this->getData($definition, $params); + + foreach ($data as $row) { + $this->bus->dispatch( + new ImportRowMessage( + $definition->getId(), + $row, + $params, + ) + ); + } + } + public function doImport(ImportDefinitionInterface $definition, $params): array { $filter = null; diff --git a/src/DataDefinitionsBundle/Messenger/ImportRowMessage.php b/src/DataDefinitionsBundle/Messenger/ImportRowMessage.php new file mode 100644 index 00000000..e821d42a --- /dev/null +++ b/src/DataDefinitionsBundle/Messenger/ImportRowMessage.php @@ -0,0 +1,46 @@ +definitionId = $definitionId; + $this->data = $data; + $this->params = $params; + } + + public function getDefinitionId(): int + { + return $this->definitionId; + } + + public function getData(): array + { + return $this->data; + } + + public function getParams(): array + { + return $this->params; + } +} diff --git a/src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php b/src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php new file mode 100644 index 00000000..12d8baeb --- /dev/null +++ b/src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php @@ -0,0 +1,44 @@ +getDefinitionId()); + + if (!$definition) { + throw new \InvalidArgumentException('Invalid definition id'); + } + + $this->importer->doImportRowAsync( + $definition, + $message->getData(), + $message->getParams(), + ); + } +} diff --git a/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml b/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml index 16f5c299..0aa0fcb6 100644 --- a/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml +++ b/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml @@ -1,3 +1,6 @@ +imports: + - { resource: messenger.yml } + jms_serializer: metadata: directories: diff --git a/src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml b/src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml new file mode 100755 index 00000000..fe0ff6cd --- /dev/null +++ b/src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml @@ -0,0 +1,18 @@ +framework: + messenger: + transports: + data_definitions_import: + dsn: "doctrine://default?queue_name=data_definitions_import" + failure_transport: data_definitions_import_failed + retry_strategy: + max_retries: 5 + delay: 300000 + multiplier: 2 + # we store failed messages here for admins to manually review them later + data_definitions_import_failed: + dsn: "doctrine://default?queue_name=data_definitions_import_failed" + retry_strategy: + max_retries: 0 + + routing: + 'Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessage': data_definitions_import \ No newline at end of file diff --git a/src/DataDefinitionsBundle/Resources/config/services.yml b/src/DataDefinitionsBundle/Resources/config/services.yml index 95a9cc03..7ca2a596 100644 --- a/src/DataDefinitionsBundle/Resources/config/services.yml +++ b/src/DataDefinitionsBundle/Resources/config/services.yml @@ -381,4 +381,10 @@ services: Wvision\Bundle\DataDefinitionsBundle\Service\StorageLocator: arguments: - $locator: !tagged_locator { tag: flysystem.storage } \ No newline at end of file + $locator: !tagged_locator { tag: flysystem.storage } + + Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessageHandler: + arguments: + - '@Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface' + tags: + - { name: messenger.message_handler } diff --git a/src/DataDefinitionsBundle/Resources/config/services/commands.yml b/src/DataDefinitionsBundle/Resources/config/services/commands.yml index d8a32392..ea5aa5ef 100644 --- a/src/DataDefinitionsBundle/Resources/config/services/commands.yml +++ b/src/DataDefinitionsBundle/Resources/config/services/commands.yml @@ -19,6 +19,14 @@ services: tags: - { name: 'console.command', command: 'data-definitions:import' } + Wvision\Bundle\DataDefinitionsBundle\Command\ImportAsyncCommand: + arguments: + - '@event_dispatcher' + - '@data_definitions.repository.import_definition' + - '@Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface' + tags: + - { name: 'console.command', command: 'data-definitions:async-import' } + Wvision\Bundle\DataDefinitionsBundle\Command\ExportCommand: arguments: - '@event_dispatcher' From 9957d797e605a84535a227c4b4679b1f8de09e30 Mon Sep 17 00:00:00 2001 From: Dominik Pfaffenbauer Date: Mon, 26 Jun 2023 16:08:12 +0200 Subject: [PATCH 2/6] run tests on 4.0 --- .github/workflows/behat.yml | 4 ++-- .github/workflows/static.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/behat.yml b/.github/workflows/behat.yml index e2c18e7f..d45dc077 100644 --- a/.github/workflows/behat.yml +++ b/.github/workflows/behat.yml @@ -1,9 +1,9 @@ name: Behat on: push: - branches: [ master ] + branches: [ '4.0' ] pull_request: - branches: [ master ] + branches: [ '4.0' ] jobs: behat: diff --git a/.github/workflows/static.yml b/.github/workflows/static.yml index 54d4d198..f7bcc927 100644 --- a/.github/workflows/static.yml +++ b/.github/workflows/static.yml @@ -1,9 +1,9 @@ name: Static Tests (Lint, Stan) on: push: - branches: [ master ] + branches: [ '4.0' ] pull_request: - branches: [ master ] + branches: [ '4.0' ] jobs: lint: From 4018846085489d89b4e59a45eef89bd63d6b31dc Mon Sep 17 00:00:00 2001 From: Dominik Pfaffenbauer Date: Mon, 26 Jun 2023 16:08:38 +0200 Subject: [PATCH 3/6] [Messenger] introduce async imports --- .../Command/ImportAsyncCommand.php | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php b/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php index 0f470de5..ac964c23 100644 --- a/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php +++ b/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php @@ -33,19 +33,11 @@ final class ImportAsyncCommand extends AbstractCommand { - protected EventDispatcherInterface $eventDispatcher; - protected DefinitionRepository $repository; - protected ImporterInterface $importer; - public function __construct( - EventDispatcherInterface $eventDispatcher, - DefinitionRepository $repository, - AsyncImporterInterface $importer + protected EventDispatcherInterface $eventDispatcher, + protected DefinitionRepository $repository, + protected AsyncImporterInterface $importer ) { - $this->eventDispatcher = $eventDispatcher; - $this->repository = $repository; - $this->importer = $importer; - parent::__construct(); } From e6619f56781f1d3201a4aa1ec731e9ebc01c83a2 Mon Sep 17 00:00:00 2001 From: Dominik Pfaffenbauer Date: Mon, 26 Jun 2023 16:13:28 +0200 Subject: [PATCH 4/6] [Messenger] introduce async imports --- src/DataDefinitionsBundle/Resources/config/services.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/src/DataDefinitionsBundle/Resources/config/services.yml b/src/DataDefinitionsBundle/Resources/config/services.yml index 7ca2a596..c5e22e0d 100644 --- a/src/DataDefinitionsBundle/Resources/config/services.yml +++ b/src/DataDefinitionsBundle/Resources/config/services.yml @@ -31,6 +31,7 @@ services: - '@logger' - '@pimcore.model.factory' - '@coreshop.expression_language' + - '@messenger.bus.default' tags: - { name: monolog.logger, channel: import_definition } - { name: 'kernel.event_listener', event: 'data_definitions.stop', method: 'stop' } From e2491d79ce4e5a82607a9365dc6885c7eef369d2 Mon Sep 17 00:00:00 2001 From: Dominik Pfaffenbauer Date: Mon, 26 Jun 2023 16:16:14 +0200 Subject: [PATCH 5/6] [Messenger] introduce async imports --- src/DataDefinitionsBundle/Resources/config/services.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DataDefinitionsBundle/Resources/config/services.yml b/src/DataDefinitionsBundle/Resources/config/services.yml index c5e22e0d..ca0aef39 100644 --- a/src/DataDefinitionsBundle/Resources/config/services.yml +++ b/src/DataDefinitionsBundle/Resources/config/services.yml @@ -31,7 +31,7 @@ services: - '@logger' - '@pimcore.model.factory' - '@coreshop.expression_language' - - '@messenger.bus.default' + - '@messenger.default_bus' tags: - { name: monolog.logger, channel: import_definition } - { name: 'kernel.event_listener', event: 'data_definitions.stop', method: 'stop' } From 5222843ae6b4a7b8a3b4a8a59f6a6d701abe6000 Mon Sep 17 00:00:00 2001 From: Dominik Pfaffenbauer Date: Mon, 26 Jun 2023 16:28:33 +0200 Subject: [PATCH 6/6] [Messenger] introduce async imports --- src/Kernel.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Kernel.php b/src/Kernel.php index 2e2ba14d..19a89cb8 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -25,8 +25,8 @@ public function registerBundlesToCollection(BundleCollection $collection) public function boot() { - \Pimcore::setKernel($this); - parent::boot(); + + \Pimcore::setKernel($this); } }