diff --git a/Model/Indexer/AsyncEventSubscriber.php b/Model/Indexer/AsyncEventSubscriber.php
index 3fab104..c6ce5dd 100644
--- a/Model/Indexer/AsyncEventSubscriber.php
+++ b/Model/Indexer/AsyncEventSubscriber.php
@@ -9,6 +9,7 @@
use Aligent\AsyncEvents\Model\Resolver\AsyncEvent;
use ArrayIterator;
use Magento\CatalogSearch\Model\Indexer\IndexerHandlerFactory;
+use Magento\Elasticsearch\Model\Adapter\Elasticsearch;
use Magento\Framework\App\DeploymentConfig;
use Magento\Framework\App\ObjectManager;
use Magento\Framework\Indexer\ActionInterface as IndexerActionInterface;
@@ -58,6 +59,7 @@ class AsyncEventSubscriber implements
* @param AsyncEvent $asyncEventScopeResolver
* @param IndexStructure $indexStructure
* @param Config $config
+ * @param Elasticsearch $dataMapperAdapter
* @param array $data
* @param int|null $batchSize
* @param DeploymentConfig|null $deploymentConfig
@@ -69,6 +71,7 @@ public function __construct(
private readonly AsyncEvent $asyncEventScopeResolver,
private readonly IndexStructureInterface $indexStructure,
private readonly Config $config,
+ private readonly Elasticsearch $dataMapperAdapter,
private readonly array $data,
int $batchSize = null,
DeploymentConfig $deploymentConfig = null
@@ -99,6 +102,7 @@ public function executeByDimensions(array $dimensions, ?Traversable $entityIds)
$saveHandler = $this->indexerHandlerFactory->create(
[
'data' => $this->data,
+ 'adapter' => $this->dataMapperAdapter,
'scopeResolver' => $this->asyncEventScopeResolver,
'indexStructure' => $this->indexStructure
]
diff --git a/Plugin/AddEntityTypeContext.php b/Plugin/AddEntityTypeContext.php
deleted file mode 100644
index 28dd011..0000000
--- a/Plugin/AddEntityTypeContext.php
+++ /dev/null
@@ -1,48 +0,0 @@
-configure([
+ 'preferences' => [
+ Config::class => TestConfig::class
+ ]
+ ]);
+
+ $this->helper = Bootstrap::getObjectManager()->create(Amqp::class);
+ $this->publisher = Bootstrap::getObjectManager()->create(PublisherInterface::class);
+ $this->json = Bootstrap::getObjectManager()->create(Json::class);
+ $this->connection = Bootstrap::getObjectManager()->get(ResourceConnection::class);
+
+ if (!$this->helper->isAvailable()) {
+ $this->fail('This test relies on RabbitMQ Management Plugin.');
+ }
+ }
+
+ /**
+ * Test event retries
+ *
+ * Disable database isolation because the transactions need to be committed so that the consumer can
+ * retrieve the subscriptions from database in a separate consumer process.
+ *
+ * @magentoDataFixture Aligent_AsyncEvents::Test/_files/http_async_events.php
+ * @magentoDbIsolation disabled
+ * @magentoConfigFixture default/system/async_events/max_deaths 3
+ */
+ public function testRetry()
+ {
+ $this->publisher->publish(
+ QueueMetadataInterface::EVENT_QUEUE,
+ [
+ 'example.event',
+ $this->json->serialize([
+ 'countryId' => 'AU'
+ ]),
+ ]
+ );
+
+ $this->publisherConsumerController = Bootstrap::getObjectManager()->create(
+ PublisherConsumerController::class,
+ [
+ 'consumers' => ['event.trigger.consumer', 'event.retry.consumer'],
+ 'logFilePath' => TESTS_TEMP_DIR . "/MessageQueueTestLog.txt",
+ 'maxMessages' => 10,
+ 'appInitParams' => Bootstrap::getInstance()->getAppInitParams()
+ ]
+ );
+
+ try {
+ $this->publisherConsumerController->startConsumers();
+ sleep(16);
+ } catch (EnvironmentPreconditionException $e) {
+ $this->markTestSkipped($e->getMessage());
+ } catch (PreconditionFailedException $e) {
+ $this->fail(
+ $e->getMessage()
+ );
+ } finally {
+ $this->publisherConsumerController->stopConsumers();
+ }
+
+ $table = $this->connection->getTableName('async_event_subscriber_log');
+ $connection = $this->connection->getConnection();
+ $select = $connection->select()
+ ->from($table, 'uuid')
+ ->columns(['events' => new \Zend_Db_Expr('COUNT(*)')])
+ ->group('uuid');
+
+ $events = $connection->fetchAll($select);
+
+ foreach ($events as $event) {
+ // An uuid batch should be retired for 3 times after the first attempt. 1 + 3
+ $this->assertEquals(4, $event['events']);
+ }
+ }
+}
diff --git a/Test/Integration/FailoverTopologyTest.php b/Test/Integration/FailoverTopologyTest.php
index adb120f..f42e5b9 100644
--- a/Test/Integration/FailoverTopologyTest.php
+++ b/Test/Integration/FailoverTopologyTest.php
@@ -9,7 +9,6 @@
namespace Aligent\AsyncEvents\Test\Integration;
-use Aligent\AsyncEvents\Helper\QueueMetadataInterface;
use Aligent\AsyncEvents\Service\AsyncEvent\RetryManager;
use Magento\TestFramework\Helper\Amqp;
use Magento\TestFramework\Helper\Bootstrap;
diff --git a/Test/Integration/TestConfig.php b/Test/Integration/TestConfig.php
new file mode 100644
index 0000000..8ed9b1c
--- /dev/null
+++ b/Test/Integration/TestConfig.php
@@ -0,0 +1,17 @@
+ CountryInformationAcquirerInterface::class,
+ "method" => "getCountryInfo",
+ ];
+ }
+}
diff --git a/Test/_files/http_async_events.php b/Test/_files/http_async_events.php
index ae29013..fc5896a 100644
--- a/Test/_files/http_async_events.php
+++ b/Test/_files/http_async_events.php
@@ -1,25 +1,19 @@
get(AsyncEventInterfaceFactory::class);
+$resource = $objectManager->get(ResourceConnection::class);
+$connection = $resource->getConnection();
-$asyncEventRepository = $objectManager->get(\Aligent\AsyncEvents\Api\AsyncEventRepositoryInterface::class);
-
-/** @var AsyncEventInterface $asyncEvent */
-$asyncEvent = $asyncEventFactory->create(
- [
- 'data' => [
- 'event' => 'example.event',
- 'recipient_url' => 'http://host.docker.internal:3001/failable',
- 'verification_token' => 'supersecret',
- 'status' => 1
- ]
- ]
-);
-$asyncEvent->setEventName('example.event');
-$asyncEventRepository->save($asyncEvent);
+$connection->insertOnDuplicate('async_event_subscriber', [
+ 'subscription_id' => 1,
+ 'event_name' => 'example.event',
+ 'recipient_url' => 'https://mock.codes/500',
+ 'status' => 1,
+ 'metadata' => 'http',
+ 'verification_token' => 'secret',
+ 'subscribed_at' => (new DateTime())->format(DateTimeInterface::ATOM)
+]);
diff --git a/etc/di.xml b/etc/di.xml
index aa12e2e..74c2d36 100644
--- a/etc/di.xml
+++ b/etc/di.xml
@@ -94,6 +94,8 @@
\Aligent\AsyncEvents\Model\Indexer\AsyncEventDimensionProvider
+ dataMapperAdapter
+ dataMapperIndexStructure
@@ -123,13 +125,22 @@
-
-
-
-
+
+
+
+
+ dataMapperAdapter
+
+
+
+
+
+
+ \Aligent\AsyncEvents\Model\Adapter\BatchDataMapper\AsyncEventLogMapper
+
+
diff --git a/etc/queue_consumer.xml b/etc/queue_consumer.xml
index dfcd093..0907f2c 100644
--- a/etc/queue_consumer.xml
+++ b/etc/queue_consumer.xml
@@ -6,5 +6,7 @@
handler="Aligent\AsyncEvents\Model\AsyncEventTriggerHandler::process"/>
+ consumerInstance="Magento\Framework\MessageQueue\Consumer"
+ handler="Aligent\AsyncEvents\Model\RetryHandler::process"
+ />