Skip to content

Commit

Permalink
Merge pull request #390 from dpfaffenbauer/messenger
Browse files Browse the repository at this point in the history
[Messenger] introduce async imports
  • Loading branch information
dpfaffenbauer authored Jun 26, 2023
2 parents 241bc2a + 5222843 commit 5c6a418
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 9 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/behat.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: Behat
on:
push:
branches: [ master ]
branches: [ '4.0' ]
pull_request:
branches: [ master ]
branches: [ '4.0' ]

jobs:
behat:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/static.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: Static Tests (Lint, Stan)
on:
push:
branches: [ master ]
branches: [ '4.0' ]
pull_request:
branches: [ master ]
branches: [ '4.0' ]

jobs:
lint:
Expand Down
87 changes: 87 additions & 0 deletions src/DataDefinitionsBundle/Command/ImportAsyncCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php
/**
* Data Definitions.
*
* LICENSE
*
* This source file is subject to the GNU General Public License version 3 (GPLv3)
* For the full copyright and license information, please view the LICENSE.md and gpl-3.0.txt
* files that are distributed with this source code.
*
* @copyright Copyright (c) 2016-2019 w-vision AG (https://www.w-vision.ch)
* @license https://github.com/w-vision/DataDefinitions/blob/master/gpl-3.0.txt GNU General Public License version 3 (GPLv3)
*/

declare(strict_types=1);

namespace Wvision\Bundle\DataDefinitionsBundle\Command;

use Exception;
use InvalidArgumentException;
use Pimcore\Console\AbstractCommand;
use Symfony\Component\Console\Helper\Helper;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Wvision\Bundle\DataDefinitionsBundle\Event\ImportDefinitionEvent;
use Wvision\Bundle\DataDefinitionsBundle\Importer\AsyncImporterInterface;
use Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface;
use Wvision\Bundle\DataDefinitionsBundle\Model\ImportDefinitionInterface;
use Wvision\Bundle\DataDefinitionsBundle\Repository\DefinitionRepository;

final class ImportAsyncCommand extends AbstractCommand
{
public function __construct(
protected EventDispatcherInterface $eventDispatcher,
protected DefinitionRepository $repository,
protected AsyncImporterInterface $importer
) {
parent::__construct();
}

protected function configure(): void
{
$this
->setName('data-definitions:async-import')
->setDescription('Run a Data Definition Import Async.')
->addOption(
'definition',
'd',
InputOption::VALUE_REQUIRED,
'Import Definition ID or Name'
)
->addOption(
'params',
'p',
InputOption::VALUE_REQUIRED,
'JSON Encoded Params'
);
}

protected function execute(InputInterface $input, OutputInterface $output)
{
$eventDispatcher = $this->eventDispatcher;

$params = json_decode($input->getOption('params'), true);

if (!isset($params['userId'])) {
$params['userId'] = 0;
}

try {
$definition = $this->repository->find($input->getOption('definition'));
} catch (InvalidArgumentException $e) {
$definition = $this->repository->findByName($input->getOption('definition'));
}

if (!$definition instanceof ImportDefinitionInterface) {
throw new Exception('Import Definition not found');
}

$this->importer->doImportAsync($definition, $params);

return 0;
}
}
26 changes: 26 additions & 0 deletions src/DataDefinitionsBundle/Importer/AsyncImporterInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php
/**
* Data Definitions.
*
* LICENSE
*
* This source file is subject to the GNU General Public License version 3 (GPLv3)
* For the full copyright and license information, please view the LICENSE.md and gpl-3.0.txt
* files that are distributed with this source code.
*
* @copyright Copyright (c) 2016-2019 w-vision AG (https://www.w-vision.ch)
* @license https://github.com/w-vision/DataDefinitions/blob/master/gpl-3.0.txt GNU General Public License version 3 (GPLv3)
*/

declare(strict_types=1);

namespace Wvision\Bundle\DataDefinitionsBundle\Importer;

use Wvision\Bundle\DataDefinitionsBundle\Model\ImportDefinitionInterface;

interface AsyncImporterInterface
{
public function doImportRowAsync(ImportDefinitionInterface $definition, array $row, array $params): void;

public function doImportAsync(ImportDefinitionInterface $definition, array $params): void;
}
67 changes: 65 additions & 2 deletions src/DataDefinitionsBundle/Importer/Importer.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
use Symfony\Component\Messenger\MessageBusInterface;
use Throwable;
use Wvision\Bundle\DataDefinitionsBundle\Context\ContextFactoryInterface;
use Wvision\Bundle\DataDefinitionsBundle\Event\EventDispatcherInterface;
Expand All @@ -39,10 +40,12 @@
use Wvision\Bundle\DataDefinitionsBundle\Filter\FilterInterface;
use Wvision\Bundle\DataDefinitionsBundle\Interpreter\InterpreterInterface;
use Wvision\Bundle\DataDefinitionsBundle\Loader\LoaderInterface;
use Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessage;
use Wvision\Bundle\DataDefinitionsBundle\Model\ImportDefinitionInterface;
use Wvision\Bundle\DataDefinitionsBundle\Model\ImportMapping;
use Wvision\Bundle\DataDefinitionsBundle\Model\ParamsAwareInterface;
use Wvision\Bundle\DataDefinitionsBundle\Persister\PersisterInterface;
use Wvision\Bundle\DataDefinitionsBundle\Provider\ArrayImportDataSet;
use Wvision\Bundle\DataDefinitionsBundle\Provider\ImportDataSet;
use Wvision\Bundle\DataDefinitionsBundle\Provider\ImportDataSetInterface;
use Wvision\Bundle\DataDefinitionsBundle\Provider\ImportProviderInterface;
Expand All @@ -52,7 +55,7 @@
use Wvision\Bundle\DataDefinitionsBundle\Runner\SetterRunnerInterface;
use Wvision\Bundle\DataDefinitionsBundle\Setter\SetterInterface;

final class Importer implements ImporterInterface
final class Importer implements ImporterInterface, AsyncImporterInterface
{
private bool $shouldStop = false;

Expand All @@ -69,11 +72,71 @@ public function __construct(
private ContextFactoryInterface $contextFactory,
private LoggerInterface $logger,
private Factory $modelFactory,
private ExpressionLanguage $expressionLanguage
private ExpressionLanguage $expressionLanguage,
private MessageBusInterface $bus,
) {

}

public function doImportRowAsync(ImportDefinitionInterface $definition, array $row, array $params): void
{
if ($definition->getCreateVersion()) {
Version::enable();
} else {
Version::disable();
}

$dataSet = new ArrayImportDataSet($row);
$runner = null;
$runnerContext = $this->contextFactory->createRunnerContext($definition, $params, $row, $dataSet, null);

if ($definition->getRunner()) {
/**
* @var RunnerInterface $runner
*/
$runner = $this->runnerRegistry->get($definition->getRunner());
}

if ($runner instanceof ImportStartFinishRunnerInterface) {
$runner->startImport($runnerContext);
}

$filter = null;
$filterType = $definition->getFilter();
if ($filterType) {
/**
* @var FilterInterface $filter
*/
$filter = $this->filterRegistry->get($filterType);
}


$object = $this->importRow(
$definition,
$row,
$dataSet,
$params,
$filter,
$runner
);
}

public function doImportAsync(ImportDefinitionInterface $definition, array $params): void
{
/** @var ImportDataSetInterface|array $data */
$data = $this->getData($definition, $params);

foreach ($data as $row) {
$this->bus->dispatch(
new ImportRowMessage(
$definition->getId(),
$row,
$params,
)
);
}
}

public function doImport(ImportDefinitionInterface $definition, $params): array
{
$filter = null;
Expand Down
46 changes: 46 additions & 0 deletions src/DataDefinitionsBundle/Messenger/ImportRowMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php
/**
* Data Definitions.
*
* LICENSE
*
* This source file is subject to the GNU General Public License version 3 (GPLv3)
* For the full copyright and license information, please view the LICENSE.md and gpl-3.0.txt
* files that are distributed with this source code.
*
* @copyright Copyright (c) 2016-2019 w-vision AG (https://www.w-vision.ch)
* @license https://github.com/w-vision/DataDefinitions/blob/master/gpl-3.0.txt GNU General Public License version 3 (GPLv3)
*/

declare(strict_types=1);

namespace Wvision\Bundle\DataDefinitionsBundle\Messenger;

class ImportRowMessage
{
private int $definitionId;
private array $data;
private array $params;

public function __construct(int $definitionId, array $data, array $params)
{
$this->definitionId = $definitionId;
$this->data = $data;
$this->params = $params;
}

public function getDefinitionId(): int
{
return $this->definitionId;
}

public function getData(): array
{
return $this->data;
}

public function getParams(): array
{
return $this->params;
}
}
44 changes: 44 additions & 0 deletions src/DataDefinitionsBundle/Messenger/ImportRowMessageHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php
/**
* Data Definitions.
*
* LICENSE
*
* This source file is subject to the GNU General Public License version 3 (GPLv3)
* For the full copyright and license information, please view the LICENSE.md and gpl-3.0.txt
* files that are distributed with this source code.
*
* @copyright Copyright (c) 2016-2019 w-vision AG (https://www.w-vision.ch)
* @license https://github.com/w-vision/DataDefinitions/blob/master/gpl-3.0.txt GNU General Public License version 3 (GPLv3)
*/

declare(strict_types=1);

namespace Wvision\Bundle\DataDefinitionsBundle\Messenger;

use Wvision\Bundle\DataDefinitionsBundle\Importer\AsyncImporterInterface;
use Wvision\Bundle\DataDefinitionsBundle\Model\ImportDefinition;

class ImportRowMessageHandler
{
public function __construct(
private AsyncImporterInterface $importer,
)
{
}

public function __invoke(ImportRowMessage $message): void
{
$definition = ImportDefinition::getById($message->getDefinitionId());

if (!$definition) {
throw new \InvalidArgumentException('Invalid definition id');
}

$this->importer->doImportRowAsync(
$definition,
$message->getData(),
$message->getParams(),
);
}
}
3 changes: 3 additions & 0 deletions src/DataDefinitionsBundle/Resources/config/pimcore/config.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
imports:
- { resource: messenger.yml }

jms_serializer:
metadata:
directories:
Expand Down
18 changes: 18 additions & 0 deletions src/DataDefinitionsBundle/Resources/config/pimcore/messenger.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
framework:
messenger:
transports:
data_definitions_import:
dsn: "doctrine://default?queue_name=data_definitions_import"
failure_transport: data_definitions_import_failed
retry_strategy:
max_retries: 5
delay: 300000
multiplier: 2
# we store failed messages here for admins to manually review them later
data_definitions_import_failed:
dsn: "doctrine://default?queue_name=data_definitions_import_failed"
retry_strategy:
max_retries: 0

routing:
'Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessage': data_definitions_import
9 changes: 8 additions & 1 deletion src/DataDefinitionsBundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:
- '@logger'
- '@pimcore.model.factory'
- '@coreshop.expression_language'
- '@messenger.default_bus'
tags:
- { name: monolog.logger, channel: import_definition }
- { name: 'kernel.event_listener', event: 'data_definitions.stop', method: 'stop' }
Expand Down Expand Up @@ -381,4 +382,10 @@ services:

Wvision\Bundle\DataDefinitionsBundle\Service\StorageLocator:
arguments:
$locator: !tagged_locator { tag: flysystem.storage }
$locator: !tagged_locator { tag: flysystem.storage }

Wvision\Bundle\DataDefinitionsBundle\Messenger\ImportRowMessageHandler:
arguments:
- '@Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface'
tags:
- { name: messenger.message_handler }
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ services:
tags:
- { name: 'console.command', command: 'data-definitions:import' }

Wvision\Bundle\DataDefinitionsBundle\Command\ImportAsyncCommand:
arguments:
- '@event_dispatcher'
- '@data_definitions.repository.import_definition'
- '@Wvision\Bundle\DataDefinitionsBundle\Importer\ImporterInterface'
tags:
- { name: 'console.command', command: 'data-definitions:async-import' }

Wvision\Bundle\DataDefinitionsBundle\Command\ExportCommand:
arguments:
- '@event_dispatcher'
Expand Down
Loading

0 comments on commit 5c6a418

Please sign in to comment.