Skip to content

Commit

Permalink
feat: batch time limit (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
priyadi authored Jul 17, 2024
1 parent 2ae3a4a commit d149bf3
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

# 0.13.2

* feat: batch time limit

# 0.13.1

* feat: add `BatchProcess` and related classes
Expand Down
5 changes: 5 additions & 0 deletions packages/rekapager-core/src/Batch/AbstractBatchProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -53,4 +54,8 @@ public function afterPage(AfterPageEvent $event): void
public function onInterrupt(InterruptEvent $event): void
{
}

public function onTimeLimit(TimeLimitEvent $event): void
{
}
}
25 changes: 23 additions & 2 deletions packages/rekapager-core/src/Batch/BatchProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;
use Rekalogika\Rekapager\Contracts\PageIdentifierEncoderResolverInterface;

/**
Expand Down Expand Up @@ -56,10 +57,13 @@ final public function stop(): bool
/**
* @param int<1,max>|null $pageSize
*/
final public function process(
final public function run(
?string $resume = null,
?int $pageSize = null
?int $pageSize = null,
?int $timeLimit = null,
): void {
$startTime = time();

// determine start page identifier

if ($resume !== null) {
Expand Down Expand Up @@ -88,6 +92,21 @@ final public function process(
$pageIdentifier = $page->getPageIdentifier();
$pageIdentifierString = $this->pageableIdentifierResolver->encode($pageIdentifier);

// check time limit

if ($timeLimit !== null && time() - $startTime >= $timeLimit) {
$timeLimitEvent = new TimeLimitEvent(
pageable: $pageable,
nextPageIdentifier: $pageIdentifierString,
);

$this->batchProcessor->onTimeLimit($timeLimitEvent);

return;
}

// check stop flag

if ($this->stopFlag) {
$interruptEvent = new InterruptEvent(
pageable: $pageable,
Expand All @@ -99,6 +118,8 @@ final public function process(
return;
}

// process page

$beforePageEvent = new BeforePageEvent(
page: $page,
encodedPageIdentifier: $pageIdentifierString,
Expand Down
6 changes: 6 additions & 0 deletions packages/rekapager-core/src/Batch/BatchProcessorDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -68,4 +69,9 @@ public function onInterrupt(InterruptEvent $event): void
{
$this->decorated->onInterrupt($event);
}

public function onTimeLimit(TimeLimitEvent $event): void
{
$this->decorated->onTimeLimit($event);
}
}
6 changes: 6 additions & 0 deletions packages/rekapager-core/src/Batch/BatchProcessorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -60,4 +61,9 @@ public function afterPage(AfterPageEvent $event): void;
* @param InterruptEvent<TKey,T> $event
*/
public function onInterrupt(InterruptEvent $event): void;

/**
* @param TimeLimitEvent<TKey,T> $event
*/
public function onTimeLimit(TimeLimitEvent $event): void;
}
45 changes: 45 additions & 0 deletions packages/rekapager-core/src/Batch/Event/TimeLimitEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

/*
* This file is part of rekalogika/rekapager package.
*
* (c) Priyadi Iman Nurcahyo <https://rekalogika.dev>
*
* For the full copyright and license information, please view the LICENSE file
* that was distributed with this source code.
*/

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;

/**
* @template TKey of array-key
* @template T
*/
final class TimeLimitEvent
{
/**
* @param PageableInterface<TKey,T> $pageable
*/
public function __construct(
private readonly PageableInterface $pageable,
private readonly ?string $nextPageIdentifier,
) {
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->pageable;
}

public function getNextPageIdentifier(): ?string
{
return $this->nextPageIdentifier;
}
}
21 changes: 18 additions & 3 deletions packages/rekapager-symfony-bridge/src/Batch/BatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ abstract class BatchCommand extends Command implements SignalableCommandInterfac
private ?SymfonyStyle $io = null;
private ?BatchProcessFactoryInterface $batchProcessFactory = null;

public function __construct(
) {
public function __construct()
{
parent::__construct();

$this->addOption('resume', 'r', InputOption::VALUE_OPTIONAL, 'Page identifier to resume from');
$this->addOption('pagesize', 'p', InputOption::VALUE_OPTIONAL, 'Batch/page/chunk size');
$this->addOption('progress-file', 'f', InputOption::VALUE_OPTIONAL, 'Temporary file to store progress data');
$this->addOption('time-limit', 't', InputOption::VALUE_OPTIONAL, 'Runs the batch up to the specified time limit (in seconds)');
}

#[Required]
Expand Down Expand Up @@ -78,6 +79,7 @@ final protected function execute(InputInterface $input, OutputInterface $output)
$resume = $input->getOption('resume');
$pageSize = $input->getOption('pagesize');
$progressFile = $input->getOption('progress-file');
$timeLimit = $input->getOption('time-limit');

/** @psalm-suppress TypeDoesNotContainType */
if (!\is_string($resume) && $resume !== null) {
Expand All @@ -98,6 +100,15 @@ final protected function execute(InputInterface $input, OutputInterface $output)
throw new InvalidArgumentException('Invalid progress-file option');
}

if (!is_numeric($timeLimit) && $timeLimit !== null) {
throw new InvalidArgumentException('Invalid time-limit option');
}

if ($timeLimit !== null) {
$timeLimit = (int) $timeLimit;
\assert($timeLimit > 0);
}

// check resuming

if ($progressFile !== null && file_exists($progressFile) && $resume === null) {
Expand All @@ -124,7 +135,11 @@ final protected function execute(InputInterface $input, OutputInterface $output)
batchProcessor: $batchProcessor,
);

$this->batchProcess->process($resume, $pageSize);
$this->batchProcess->run(
resume: $resume,
pageSize: $pageSize,
timeLimit: $timeLimit,
);

return Command::SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;
use Symfony\Component\Console\Helper\Helper;
use Symfony\Component\Console\Helper\ProgressIndicator;
use Symfony\Component\Console\Style\SymfonyStyle;
Expand Down Expand Up @@ -143,6 +144,12 @@ public function processItem(ItemEvent $itemEvent): void

public function onInterrupt(InterruptEvent $event): void
{
$nextPageIdentifier = $event->getNextPageIdentifier();

if ($this->progressFile !== null && $nextPageIdentifier !== null) {
file_put_contents($this->progressFile, $nextPageIdentifier);
}

$this->decorated->onInterrupt($event);

$nextPageIdentifier = $event->getNextPageIdentifier();
Expand All @@ -164,11 +171,35 @@ public function onInterrupt(InterruptEvent $event): void
$this->showStats($event);
}

public function onTimeLimit(TimeLimitEvent $event): void
{
$nextPageIdentifier = $event->getNextPageIdentifier();

if ($this->progressFile !== null && $nextPageIdentifier !== null) {
file_put_contents($this->progressFile, $nextPageIdentifier);
}

$this->decorated->onTimeLimit($event);

$nextPageIdentifier = $event->getNextPageIdentifier();

if ($nextPageIdentifier !== null) {
$this->io->warning(sprintf(
'Time limit reached. To resume, use the argument "-r %s"',
$nextPageIdentifier
));
} else {
$this->io->error('Time limit reached, but there does not seem to be a next page identifier for you to resume');
}

$this->showStats($event);
}

/**
* @param AfterPageEvent<TKey,T>|AfterProcessEvent<TKey,T>|InterruptEvent<TKey,T> $event
* @param AfterPageEvent<TKey,T>|AfterProcessEvent<TKey,T>|InterruptEvent<TKey,T>|TimeLimitEvent<TKey,T> $event
* @return void
*/
private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent $event): void
private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeLimitEvent $event): void
{
if ($event instanceof AfterPageEvent) {
$this->io->writeln('');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforePageEvent;
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -57,4 +58,8 @@ public function afterPage(AfterPageEvent $event): void
public function onInterrupt(InterruptEvent $event): void
{
}

public function onTimeLimit(TimeLimitEvent $event): void
{
}
}

0 comments on commit d149bf3

Please sign in to comment.