diff --git a/.github/workflows/behat.yml b/.github/workflows/behat.yml index e2c18e7f..d45dc077 100644 --- a/.github/workflows/behat.yml +++ b/.github/workflows/behat.yml @@ -1,9 +1,9 @@ name: Behat on: push: - branches: [ master ] + branches: [ '4.0' ] pull_request: - branches: [ master ] + branches: [ '4.0' ] jobs: behat: diff --git a/.github/workflows/static.yml b/.github/workflows/static.yml index 54d4d198..f7bcc927 100644 --- a/.github/workflows/static.yml +++ b/.github/workflows/static.yml @@ -1,9 +1,9 @@ name: Static Tests (Lint, Stan) on: push: - branches: [ master ] + branches: [ '4.0' ] pull_request: - branches: [ master ] + branches: [ '4.0' ] jobs: lint: diff --git a/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php b/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php new file mode 100644 index 00000000..ac964c23 --- /dev/null +++ b/src/DataDefinitionsBundle/Command/ImportAsyncCommand.php @@ -0,0 +1,87 @@ +setName('data-definitions:async-import') + ->setDescription('Run a Data Definition Import Async.') + ->addOption( + 'definition', + 'd', + InputOption::VALUE_REQUIRED, + 'Import Definition ID or Name' + ) + ->addOption( + 'params', + 'p', + InputOption::VALUE_REQUIRED, + 'JSON Encoded Params' + ); + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $eventDispatcher = $this->eventDispatcher; + + $params = json_decode($input->getOption('params'), true); + + if (!isset($params['userId'])) { + $params['userId'] = 0; + } + + try { + $definition = $this->repository->find($input->getOption('definition')); + } catch (InvalidArgumentException $e) { + $definition = $this->repository->findByName($input->getOption('definition')); + } + + if (!$definition instanceof ImportDefinitionInterface) { + throw new Exception('Import Definition not found'); + } + + $this->importer->doImportAsync($definition, $params); + + return 0; + } +} diff --git a/src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php b/src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php new file mode 100644 index 00000000..8a1cca58 --- /dev/null +++ b/src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php @@ -0,0 +1,26 @@ +getCreateVersion()) { + Version::enable(); + } else { + Version::disable(); + } + + $dataSet = new ArrayImportDataSet($row); + $runner = null; + $runnerContext = $this->contextFactory->createRunnerContext($definition, $params, $row, $dataSet, null); + + if ($definition->getRunner()) { + /** + * @var RunnerInterface $runner + */ + $runner = $this->runnerRegistry->get($definition->getRunner()); + } + + if ($runner instanceof ImportStartFinishRunnerInterface) { + $runner->startImport($runnerContext); + } + + $filter = null; + $filterType = $definition->getFilter(); + if ($filterType) { + /** + * @var FilterInterface $filter + */ + $filter = $this->filterRegistry->get($filterType); + } + + + $object = $this->importRow( + $definition, + $row, + $dataSet, + $params, + $filter, + $runner + ); + } + + public function doImportAsync(ImportDefinitionInterface $definition, array $params): void + { + /** @var ImportDataSetInterface|array $data */ + $data = $this->getData($definition, $params); + + foreach ($data as $row) { + $this->bus->dispatch( + new ImportRowMessage( + $definition->getId(), + $row, + $params, + ) + ); + } + } + public function doImport(ImportDefinitionInterface $definition, $params): array { $filter = null; diff --git a/src/DataDefinitionsBundle/Messenger/ImportRowMessage.php b/src/DataDefinitionsBundle/Messenger/ImportRowMessage.php new file mode 100644 index 00000000..e821d42a --- /dev/null +++ b/src/DataDefinitionsBundle/Messenger/ImportRowMessage.php @@ -0,0 +1,46 @@ +definitionId = $definitionId; + $this->data = $data; + $this->params = $params; + } + + public function getDefinitionId(): int + { + return $this->definitionId; + } + + public function getData(): array + { + return $this->data; + } + + public function getParams(): array + { + return $this->params; + } +} diff --git a/src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php b/src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php new file mode 100644 index 00000000..12d8baeb --- /dev/null +++ b/src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php @@ -0,0 +1,44 @@ +getDefinitionId()); + + if (!$definition) { + throw new \InvalidArgumentException('Invalid definition id'); + } + + $this->importer->doImportRowAsync( + $definition, + $message->getData(), + $message->getParams(), + ); + } +} diff --git a/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml b/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml index 16f5c299..0aa0fcb6 100644 --- a/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml +++ b/src/DataDefinitionsBundle/Resources/config/pimcore/config.yml @@ -1,3 +1,6 @@ +imports: + - { resource: messenger.yml } + jms_serializer: metadata: directories: diff --git a/src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml b/src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml new file mode 100755 index 00000000..fe0ff6cd --- /dev/null +++ b/src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml @@ -0,0 +1,18 @@ +framework: + messenger: + transports: + data_definitions_import: + dsn: "doctrine://default?queue_name=data_definitions_import" + failure_transport: data_definitions_import_failed + retry_strategy: + max_retries: 5 + delay: 300000 + multiplier: 2 + # we store failed messages here for admins to manually review them later + data_definitions_import_failed: + dsn: "doctrine://default?queue_name=data_definitions_import_failed" + retry_strategy: + max_retries: 0 + + routing: + 'Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessage': data_definitions_import \ No newline at end of file diff --git a/src/DataDefinitionsBundle/Resources/config/services.yml b/src/DataDefinitionsBundle/Resources/config/services.yml index 95a9cc03..ca0aef39 100644 --- a/src/DataDefinitionsBundle/Resources/config/services.yml +++ b/src/DataDefinitionsBundle/Resources/config/services.yml @@ -31,6 +31,7 @@ services: - '@logger' - '@pimcore.model.factory' - '@coreshop.expression_language' + - '@messenger.default_bus' tags: - { name: monolog.logger, channel: import_definition } - { name: 'kernel.event_listener', event: 'data_definitions.stop', method: 'stop' } @@ -381,4 +382,10 @@ services: Wvision\Bundle\DataDefinitionsBundle\Service\StorageLocator: arguments: - $locator: !tagged_locator { tag: flysystem.storage } \ No newline at end of file + $locator: !tagged_locator { tag: flysystem.storage } + + Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessageHandler: + arguments: + - '@Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface' + tags: + - { name: messenger.message_handler } diff --git a/src/DataDefinitionsBundle/Resources/config/services/commands.yml b/src/DataDefinitionsBundle/Resources/config/services/commands.yml index d8a32392..ea5aa5ef 100644 --- a/src/DataDefinitionsBundle/Resources/config/services/commands.yml +++ b/src/DataDefinitionsBundle/Resources/config/services/commands.yml @@ -19,6 +19,14 @@ services: tags: - { name: 'console.command', command: 'data-definitions:import' } + Wvision\Bundle\DataDefinitionsBundle\Command\ImportAsyncCommand: + arguments: + - '@event_dispatcher' + - '@data_definitions.repository.import_definition' + - '@Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface' + tags: + - { name: 'console.command', command: 'data-definitions:async-import' } + Wvision\Bundle\DataDefinitionsBundle\Command\ExportCommand: arguments: - '@event_dispatcher' diff --git a/src/Kernel.php b/src/Kernel.php index 2e2ba14d..19a89cb8 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -25,8 +25,8 @@ public function registerBundlesToCollection(BundleCollection $collection) public function boot() { - \Pimcore::setKernel($this); - parent::boot(); + + \Pimcore::setKernel($this); } }