Skip to content

Commit

Permalink
feat(batch): show remaining time if the count is known (#129)
Browse files Browse the repository at this point in the history
* feat: batch remaining time

* cs

* changelog

* fix
  • Loading branch information
priyadi authored Jul 17, 2024
1 parent cd9ced8 commit 43a3148
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# 0.13.3

* fix: batch size CLI option
* feat(batch): show remaining time if the count is known

# 0.13.2

Expand Down
9 changes: 9 additions & 0 deletions packages/rekapager-core/src/Batch/Event/AfterPageEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;
use Rekalogika\Contracts\Rekapager\PageInterface;

/**
Expand All @@ -29,6 +30,14 @@ public function __construct(
) {
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->beforePageEvent->getPageable();
}

/**
* @return PageInterface<TKey,T>
*/
Expand Down
9 changes: 9 additions & 0 deletions packages/rekapager-core/src/Batch/Event/BeforePageEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;
use Rekalogika\Contracts\Rekapager\PageInterface;

/**
Expand All @@ -38,6 +39,14 @@ public function getPage(): PageInterface
return $this->page;
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->page->getPageable();
}

public function getEncodedPageIdentifier(): string
{
return $this->encodedPageIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ final protected function execute(InputInterface $input, OutputInterface $output)
$this->io = new SymfonyStyle($input, $output);

$batchProcessor = new CommandBatchProcessorDecorator(
description: $this->getDescription(),
decorated: $this->getBatchProcessor(),
io: $this->io,
progressFile: $progressFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,19 @@ class CommandBatchProcessorDecorator extends BatchProcessorDecorator

private readonly ProgressIndicator $progressIndicator;

private ?string $startPageIdentifier = null;

private ?int $totalPages = null;

private ?int $totalItems = null;

private int $itemsPerPage = 0;

/**
* @param BatchProcessorInterface<TKey,T> $decorated
*/
public function __construct(
private readonly string $description,
private readonly BatchProcessorInterface $decorated,
private readonly SymfonyStyle $io,
private readonly ?string $progressFile,
Expand All @@ -63,22 +72,29 @@ private function formatTime(\DateTimeInterface $time): string
return $time->format('Y-m-d H:i:s T');
}

private function getStartTime(): \DateTimeInterface
{
if ($this->startTime === null) {
throw new \LogicException('Start time is not set');
}

return $this->startTime;
}

public function beforeProcess(BeforeProcessEvent $event): void
{
$this->startTime = new \DateTimeImmutable();
$this->timer->start(BatchTimer::TIMER_DISPLAY);
$this->timer->start(BatchTimer::TIMER_PROCESS);

$this->startPageIdentifier = $event->getStartPageIdentifier();
$this->totalPages = $event->getPageable()->getTotalPages();
$this->totalItems = $event->getPageable()->getTotalItems();
$this->itemsPerPage = $event->getPageable()->getItemsPerPage();

$this->io->success('Starting batch process');

$this->io->definitionList(
['Start time' => $this->formatTime($this->startTime)],
['Start page' => $event->getStartPageIdentifier() ?? '(first page)'],
['Progress file' => $this->progressFile ?? '(not used)'],
['Items per page' => $event->getPageable()->getItemsPerPage()],
['Total pages' => $event->getPageable()->getTotalPages() ?? '(unknown)'],
['Total items' => $event->getPageable()->getTotalItems() ?? '(unknown)'],
);
$this->showStats($event);

$this->decorated->beforeProcess($event);
}
Expand All @@ -95,6 +111,27 @@ public function afterProcess(AfterProcessEvent $event): void
$this->showStats($event);
}

/**
* @param BeforePageEvent<TKey,T>|AfterPageEvent<TKey,T> $event
*/
private function getProgressString(BeforePageEvent|AfterPageEvent $event): string
{
if ($this->totalPages === null) {
return sprintf(
'Page <info>%s</info>, identifier <info>%s</info>',
$event->getPage()->getPageNumber() ?? '?',
$event->getEncodedPageIdentifier(),
);
}
return sprintf(
'Page <info>%s</info>/<info>%s</info>, identifier <info>%s</info>',
$event->getPage()->getPageNumber() ?? '?',
$this->totalPages,
$event->getEncodedPageIdentifier(),
);

}

public function beforePage(BeforePageEvent $event): void
{
$this->pageNumber++;
Expand All @@ -104,12 +141,7 @@ public function beforePage(BeforePageEvent $event): void
file_put_contents($this->progressFile, $event->getEncodedPageIdentifier());
}

$this->progressIndicator->start(sprintf(
'Page <info>%s</info>, identifier <info>%s</info>',
$event->getPage()->getPageNumber() ?? '(unknown)',
$event->getEncodedPageIdentifier(),
));

$this->progressIndicator->start($this->getProgressString($event));
$this->decorated->beforePage($event);
}

Expand All @@ -118,11 +150,7 @@ public function afterPage(AfterPageEvent $event): void
$this->decorated->afterPage($event);
// $pageDuration = $this->timer->stop(BatchTimer::TIMER_PAGE);

$this->progressIndicator->finish(sprintf(
'Page <info>%s</info>, identifier <info>%s</info>',
$event->getPage()->getPageNumber() ?? '(unknown)',
$event->getEncodedPageIdentifier(),
));
$this->progressIndicator->finish($this->getProgressString($event));

$displayDuration = $this->timer->getDuration(BatchTimer::TIMER_DISPLAY);

Expand Down Expand Up @@ -200,30 +228,65 @@ public function onTimeLimit(TimeLimitEvent $event): void
}

/**
* @param AfterPageEvent<TKey,T>|AfterProcessEvent<TKey,T>|InterruptEvent<TKey,T>|TimeLimitEvent<TKey,T> $event
* @param BeforeProcessEvent<TKey,T>|AfterPageEvent<TKey,T>|AfterProcessEvent<TKey,T>|InterruptEvent<TKey,T>|TimeLimitEvent<TKey,T> $event
*/
private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeLimitEvent $event): void
private function showStats(BeforeProcessEvent|AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeLimitEvent $event): void
{
if ($event instanceof AfterPageEvent) {
$this->io->writeln('');
}

$processDuration = $this->timer->getDuration(BatchTimer::TIMER_PROCESS);

$stats = [];

if ($this->startTime !== null) {
$stats[] = ['Start time' => $this->formatTime($this->startTime)];
if ($processDuration !== null) {
$pagesPerSecond = $this->pageNumber / $processDuration;
$itemsPerSecond = $this->itemNumber / $processDuration;
} else {
$pagesPerSecond = 0;
$itemsPerSecond = 0;
}

$estimatedEnd = null;
$eta = null;

$stats = [
['Description' => $this->description],
['Start page' => $this->startPageIdentifier ?? '(first page)'],
['Progress file' => $this->progressFile ?? '(not used)'],
['Items per page' => $this->itemsPerPage],
['Total pages' => $this->totalPages ?? '(unknown)'],
['Total items' => $this->totalItems ?? '(unknown)'],
];

$stats[] = ['Start time' => $this->formatTime($this->getStartTime())];

if ($event instanceof AfterPageEvent) {
$stats[] = ['Current time' => $this->formatTime(new \DateTimeImmutable())];
} else {

if ($this->totalItems !== null) {
$remainingItems = $this->totalItems - $this->itemNumber;
if ($remainingItems < 0) {
$remainingItems = 0;
}

$eta = $remainingItems / $itemsPerSecond;
$estimatedEnd = time() + $eta;
$stats[] = ['Estimated end time' => $this->formatTime((new \DateTimeImmutable('@' . $estimatedEnd))->setTimezone(new \DateTimeZone(date_default_timezone_get())))];
}
} elseif (
$event instanceof AfterProcessEvent
|| $event instanceof InterruptEvent
|| $event instanceof TimeLimitEvent
) {
$stats[] = ['End time' => $this->formatTime(new \DateTimeImmutable())];
}

if ($processDuration !== null) {
$stats[] = ['Time elapsed' => Helper::formatTime($processDuration)];

if ($eta !== null && $event instanceof AfterPageEvent) {
$stats[] = ['Estimated time remaining' => Helper::formatTime($eta)];
}
}

$stats = [
Expand All @@ -234,8 +297,8 @@ private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeL
];

if ($processDuration !== null) {
$stats[] = ['Pages/minute' => round($this->pageNumber / $processDuration * 60, 2)];
$stats[] = ['Items/minute' => round($this->itemNumber / $processDuration * 60, 2)];
$stats[] = ['Pages/minute' => round($pagesPerSecond * 60, 2)];
$stats[] = ['Items/minute' => round($itemsPerSecond * 60, 2)];
}

$this->io->definitionList(...$stats);
Expand Down
11 changes: 10 additions & 1 deletion tests/src/App/Command/AppSimpleBatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Rekalogika\Rekapager\Tests\App\Repository\PostRepository;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

/**
Expand All @@ -42,13 +43,21 @@ public function __construct(
parent::__construct();
}

protected function configure(): void
{
$this->addOption('count', null, InputOption::VALUE_NONE, 'Count the total items');
}

protected function getPageable(
InputInterface $input,
OutputInterface $output
): PageableInterface {
/** @psalm-suppress RedundantCast */
$count = (bool) $input->getOption('count');

$adapter = new SelectableAdapter($this->postRepository);

return new KeysetPageable($adapter);
return new KeysetPageable($adapter, count: $count);
}

public function processItem(ItemEvent $itemEvent): void
Expand Down

0 comments on commit 43a3148

Please sign in to comment.