From d149bf344566f4eeb6ce97d3d1a57386d2098023 Mon Sep 17 00:00:00 2001 From: Priyadi Iman Nurcahyo <1102197+priyadi@users.noreply.github.com> Date: Wed, 17 Jul 2024 19:56:18 +0700 Subject: [PATCH] feat: batch time limit (#125) --- CHANGELOG.md | 4 ++ .../src/Batch/AbstractBatchProcessor.php | 5 +++ .../rekapager-core/src/Batch/BatchProcess.php | 25 ++++++++++- .../src/Batch/BatchProcessorDecorator.php | 6 +++ .../src/Batch/BatchProcessorInterface.php | 6 +++ .../src/Batch/Event/TimeLimitEvent.php | 45 +++++++++++++++++++ .../src/Batch/BatchCommand.php | 21 +++++++-- .../CommandBatchProcessorDecorator.php | 35 ++++++++++++++- .../src/Batch/SimpleBatchCommand.php | 5 +++ 9 files changed, 145 insertions(+), 7 deletions(-) create mode 100644 packages/rekapager-core/src/Batch/Event/TimeLimitEvent.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 957fa1d..6cda936 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +# 0.13.2 + +* feat: batch time limit + # 0.13.1 * feat: add `BatchProcess` and related classes diff --git a/packages/rekapager-core/src/Batch/AbstractBatchProcessor.php b/packages/rekapager-core/src/Batch/AbstractBatchProcessor.php index 56feea4..e0ea896 100644 --- a/packages/rekapager-core/src/Batch/AbstractBatchProcessor.php +++ b/packages/rekapager-core/src/Batch/AbstractBatchProcessor.php @@ -19,6 +19,7 @@ use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent; use Rekalogika\Rekapager\Batch\Event\InterruptEvent; use Rekalogika\Rekapager\Batch\Event\ItemEvent; +use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent; /** * @template TKey of array-key @@ -53,4 +54,8 @@ public function afterPage(AfterPageEvent $event): void public function onInterrupt(InterruptEvent $event): void { } + + public function onTimeLimit(TimeLimitEvent $event): void + { + } } diff --git a/packages/rekapager-core/src/Batch/BatchProcess.php b/packages/rekapager-core/src/Batch/BatchProcess.php index aa5d19e..0542114 100644 --- a/packages/rekapager-core/src/Batch/BatchProcess.php +++ b/packages/rekapager-core/src/Batch/BatchProcess.php @@ -20,6 +20,7 @@ use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent; use Rekalogika\Rekapager\Batch\Event\InterruptEvent; use Rekalogika\Rekapager\Batch\Event\ItemEvent; +use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent; use Rekalogika\Rekapager\Contracts\PageIdentifierEncoderResolverInterface; /** @@ -56,10 +57,13 @@ final public function stop(): bool /** * @param int<1,max>|null $pageSize */ - final public function process( + final public function run( ?string $resume = null, - ?int $pageSize = null + ?int $pageSize = null, + ?int $timeLimit = null, ): void { + $startTime = time(); + // determine start page identifier if ($resume !== null) { @@ -88,6 +92,21 @@ final public function process( $pageIdentifier = $page->getPageIdentifier(); $pageIdentifierString = $this->pageableIdentifierResolver->encode($pageIdentifier); + // check time limit + + if ($timeLimit !== null && time() - $startTime >= $timeLimit) { + $timeLimitEvent = new TimeLimitEvent( + pageable: $pageable, + nextPageIdentifier: $pageIdentifierString, + ); + + $this->batchProcessor->onTimeLimit($timeLimitEvent); + + return; + } + + // check stop flag + if ($this->stopFlag) { $interruptEvent = new InterruptEvent( pageable: $pageable, @@ -99,6 +118,8 @@ final public function process( return; } + // process page + $beforePageEvent = new BeforePageEvent( page: $page, encodedPageIdentifier: $pageIdentifierString, diff --git a/packages/rekapager-core/src/Batch/BatchProcessorDecorator.php b/packages/rekapager-core/src/Batch/BatchProcessorDecorator.php index 964038c..04588e1 100644 --- a/packages/rekapager-core/src/Batch/BatchProcessorDecorator.php +++ b/packages/rekapager-core/src/Batch/BatchProcessorDecorator.php @@ -19,6 +19,7 @@ use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent; use Rekalogika\Rekapager\Batch\Event\InterruptEvent; use Rekalogika\Rekapager\Batch\Event\ItemEvent; +use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent; /** * @template TKey of array-key @@ -68,4 +69,9 @@ public function onInterrupt(InterruptEvent $event): void { $this->decorated->onInterrupt($event); } + + public function onTimeLimit(TimeLimitEvent $event): void + { + $this->decorated->onTimeLimit($event); + } } diff --git a/packages/rekapager-core/src/Batch/BatchProcessorInterface.php b/packages/rekapager-core/src/Batch/BatchProcessorInterface.php index f29dea7..fa8612d 100644 --- a/packages/rekapager-core/src/Batch/BatchProcessorInterface.php +++ b/packages/rekapager-core/src/Batch/BatchProcessorInterface.php @@ -19,6 +19,7 @@ use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent; use Rekalogika\Rekapager\Batch\Event\InterruptEvent; use Rekalogika\Rekapager\Batch\Event\ItemEvent; +use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent; /** * @template TKey of array-key @@ -60,4 +61,9 @@ public function afterPage(AfterPageEvent $event): void; * @param InterruptEvent $event */ public function onInterrupt(InterruptEvent $event): void; + + /** + * @param TimeLimitEvent $event + */ + public function onTimeLimit(TimeLimitEvent $event): void; } diff --git a/packages/rekapager-core/src/Batch/Event/TimeLimitEvent.php b/packages/rekapager-core/src/Batch/Event/TimeLimitEvent.php new file mode 100644 index 0000000..6a58ab8 --- /dev/null +++ b/packages/rekapager-core/src/Batch/Event/TimeLimitEvent.php @@ -0,0 +1,45 @@ + + * + * For the full copyright and license information, please view the LICENSE file + * that was distributed with this source code. + */ + +namespace Rekalogika\Rekapager\Batch\Event; + +use Rekalogika\Contracts\Rekapager\PageableInterface; + +/** + * @template TKey of array-key + * @template T + */ +final class TimeLimitEvent +{ + /** + * @param PageableInterface $pageable + */ + public function __construct( + private readonly PageableInterface $pageable, + private readonly ?string $nextPageIdentifier, + ) { + } + + /** + * @return PageableInterface + */ + public function getPageable(): PageableInterface + { + return $this->pageable; + } + + public function getNextPageIdentifier(): ?string + { + return $this->nextPageIdentifier; + } +} diff --git a/packages/rekapager-symfony-bridge/src/Batch/BatchCommand.php b/packages/rekapager-symfony-bridge/src/Batch/BatchCommand.php index ade55c0..5c0a80b 100644 --- a/packages/rekapager-symfony-bridge/src/Batch/BatchCommand.php +++ b/packages/rekapager-symfony-bridge/src/Batch/BatchCommand.php @@ -42,13 +42,14 @@ abstract class BatchCommand extends Command implements SignalableCommandInterfac private ?SymfonyStyle $io = null; private ?BatchProcessFactoryInterface $batchProcessFactory = null; - public function __construct( - ) { + public function __construct() + { parent::__construct(); $this->addOption('resume', 'r', InputOption::VALUE_OPTIONAL, 'Page identifier to resume from'); $this->addOption('pagesize', 'p', InputOption::VALUE_OPTIONAL, 'Batch/page/chunk size'); $this->addOption('progress-file', 'f', InputOption::VALUE_OPTIONAL, 'Temporary file to store progress data'); + $this->addOption('time-limit', 't', InputOption::VALUE_OPTIONAL, 'Runs the batch up to the specified time limit (in seconds)'); } #[Required] @@ -78,6 +79,7 @@ final protected function execute(InputInterface $input, OutputInterface $output) $resume = $input->getOption('resume'); $pageSize = $input->getOption('pagesize'); $progressFile = $input->getOption('progress-file'); + $timeLimit = $input->getOption('time-limit'); /** @psalm-suppress TypeDoesNotContainType */ if (!\is_string($resume) && $resume !== null) { @@ -98,6 +100,15 @@ final protected function execute(InputInterface $input, OutputInterface $output) throw new InvalidArgumentException('Invalid progress-file option'); } + if (!is_numeric($timeLimit) && $timeLimit !== null) { + throw new InvalidArgumentException('Invalid time-limit option'); + } + + if ($timeLimit !== null) { + $timeLimit = (int) $timeLimit; + \assert($timeLimit > 0); + } + // check resuming if ($progressFile !== null && file_exists($progressFile) && $resume === null) { @@ -124,7 +135,11 @@ final protected function execute(InputInterface $input, OutputInterface $output) batchProcessor: $batchProcessor, ); - $this->batchProcess->process($resume, $pageSize); + $this->batchProcess->run( + resume: $resume, + pageSize: $pageSize, + timeLimit: $timeLimit, + ); return Command::SUCCESS; } diff --git a/packages/rekapager-symfony-bridge/src/Batch/Internal/CommandBatchProcessorDecorator.php b/packages/rekapager-symfony-bridge/src/Batch/Internal/CommandBatchProcessorDecorator.php index 863f7c5..290208f 100644 --- a/packages/rekapager-symfony-bridge/src/Batch/Internal/CommandBatchProcessorDecorator.php +++ b/packages/rekapager-symfony-bridge/src/Batch/Internal/CommandBatchProcessorDecorator.php @@ -21,6 +21,7 @@ use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent; use Rekalogika\Rekapager\Batch\Event\InterruptEvent; use Rekalogika\Rekapager\Batch\Event\ItemEvent; +use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent; use Symfony\Component\Console\Helper\Helper; use Symfony\Component\Console\Helper\ProgressIndicator; use Symfony\Component\Console\Style\SymfonyStyle; @@ -143,6 +144,12 @@ public function processItem(ItemEvent $itemEvent): void public function onInterrupt(InterruptEvent $event): void { + $nextPageIdentifier = $event->getNextPageIdentifier(); + + if ($this->progressFile !== null && $nextPageIdentifier !== null) { + file_put_contents($this->progressFile, $nextPageIdentifier); + } + $this->decorated->onInterrupt($event); $nextPageIdentifier = $event->getNextPageIdentifier(); @@ -164,11 +171,35 @@ public function onInterrupt(InterruptEvent $event): void $this->showStats($event); } + public function onTimeLimit(TimeLimitEvent $event): void + { + $nextPageIdentifier = $event->getNextPageIdentifier(); + + if ($this->progressFile !== null && $nextPageIdentifier !== null) { + file_put_contents($this->progressFile, $nextPageIdentifier); + } + + $this->decorated->onTimeLimit($event); + + $nextPageIdentifier = $event->getNextPageIdentifier(); + + if ($nextPageIdentifier !== null) { + $this->io->warning(sprintf( + 'Time limit reached. To resume, use the argument "-r %s"', + $nextPageIdentifier + )); + } else { + $this->io->error('Time limit reached, but there does not seem to be a next page identifier for you to resume'); + } + + $this->showStats($event); + } + /** - * @param AfterPageEvent|AfterProcessEvent|InterruptEvent $event + * @param AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeLimitEvent $event * @return void */ - private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent $event): void + private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeLimitEvent $event): void { if ($event instanceof AfterPageEvent) { $this->io->writeln(''); diff --git a/packages/rekapager-symfony-bridge/src/Batch/SimpleBatchCommand.php b/packages/rekapager-symfony-bridge/src/Batch/SimpleBatchCommand.php index b3bf7cb..c10c630 100644 --- a/packages/rekapager-symfony-bridge/src/Batch/SimpleBatchCommand.php +++ b/packages/rekapager-symfony-bridge/src/Batch/SimpleBatchCommand.php @@ -19,6 +19,7 @@ use Rekalogika\Rekapager\Batch\Event\BeforePageEvent; use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent; use Rekalogika\Rekapager\Batch\Event\InterruptEvent; +use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent; /** * @template TKey of array-key @@ -57,4 +58,8 @@ public function afterPage(AfterPageEvent $event): void public function onInterrupt(InterruptEvent $event): void { } + + public function onTimeLimit(TimeLimitEvent $event): void + { + } }