diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index fa97a17..77fd584 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -173,14 +173,6 @@ public function produceAndQueueJobs($data = null): Promise $job = $jobs->getCurrent(); $job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer))); $jobsCount += yield $this->queueManager->enqueue($job); - $this->logger->info( - 'Successfully produced a new Job', - [ - 'producer' => \get_class($this->producer), - 'job_uuid' => $job->getUuid(), - 'payload_data' => NonUtf8Cleaner::clean($job->getPayloadData()) - ] - ); } $jobsCount += yield $this->queueManager->flush(); diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index b77f778..3a03eda 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -5,6 +5,7 @@ namespace Webgriffe\Esb\Service; use Amp\Beanstalk\BeanstalkClient; +use Webgriffe\Esb\NonUtf8Cleaner; use function Amp\call; use Amp\Promise; use Psr\Log\LoggerInterface; @@ -96,7 +97,7 @@ public function enqueue(JobInterface $job): Promise ) ); } - $this->batch[] = $job; + $this->batch[$job->getUuid()] = $job; $count = count($this->batch); if ($count < $this->batchSize) { @@ -248,6 +249,14 @@ private function processBatch(): \Generator $singleJob->getDelay(), $singleJob->getPriority() ); + $this->logger->info( + 'Successfully enqueued a new Job', + [ + 'flow_name' => $this->flowConfig->getName(), + 'job_uuid' => $singleJob->getUuid(), + 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()) + ] + ); } $this->batch = []; diff --git a/tests/Integration/ElasticSearchIndexingTest.php b/tests/Integration/ElasticSearchIndexingTest.php index 21ebded..b81660e 100644 --- a/tests/Integration/ElasticSearchIndexingTest.php +++ b/tests/Integration/ElasticSearchIndexingTest.php @@ -175,7 +175,7 @@ function (Job $job) { /** * @test */ - public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearchWithAllEvents() + public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearch() { $producerDir = vfsStream::url('root/producer_dir'); $workerFile = vfsStream::url('root/worker.data'); @@ -216,6 +216,14 @@ function ($log) { $search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, '')); $this->assertCount(1, $search['hits']['hits']); $this->assertTrue($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch')); + $logRecords = $this->logHandler()->getRecords(); + $successfullyIndexedLog = array_filter( + $logRecords, + function ($log) { + return strpos($log['message'], 'Successfully enqueued a new Job') !== false; + } + ); + $this->assertCount(1, $successfullyIndexedLog); } private function assertForEachJob(callable $callable, array $jobsData)