Skip to content

Commit

Permalink
Improve stopping process, improved logs
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisbjr committed Jun 6, 2024
1 parent 6cdefd1 commit ec11da5
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 25 deletions.
1 change: 1 addition & 0 deletions src/Enum/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ enum Status: string
case IN_PROGRESS = "In Progress";
case FAILED = "Failed";
case COMPLETED = "Completed";
case STOPPING = "Stopping";
case STOPPED = "Stopped";
}
13 changes: 11 additions & 2 deletions src/Import/Jobs/BulkImportProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ public function handle(): void

$import->refresh();

if ($import->status === Status::STOPPED->value) {
Log::debug('[BulkImportProcessor] Import was stopped. Marking import as completed.');
if (in_array($import->status, [Status::STOPPED->value, Status::STOPPING->value])) {
Log::info('[BulkImportProcessor] Import was stopped.', [
'import_id' => $import->id,
'import_processor' => $import->processor,
]);
$import->update([
'status' => Status::STOPPED,
'completed_at' => now(),
]);
} else {
Expand All @@ -121,6 +125,11 @@ public function handle(): void
]);
}

Log::info('[BulkImportProcessor] Import finished.', [
'import_id' => $import->id,
'import_processor' => $import->processor,
]);

// Remove the import file from local storage
if (file_exists($filepath)) {
unlink($filepath);
Expand Down
12 changes: 8 additions & 4 deletions src/Import/Jobs/CollateFailedChunks.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ public function __construct(protected Import $import)
*/
public function handle(): void
{
Log::debug('[CollateFailedChunks] Processing failed chunks', [
Log::info('[CollateFailedChunks] Processing failed chunks', [
'import_id' => $this->import->id,
'processor' => $this->import->processor,
]);

$failedChunksMedia = $this->import->getMedia('failed-chunks');

if ($failedChunksMedia->count() === 0) {
Log::debug('[CollateFailedChunks] No failed chunks found', [
Log::info('[CollateFailedChunks] No failed chunks found', [
'import_id' => $this->import->id,
'processor' => $this->import->processor,
]);
return;
}
Expand Down Expand Up @@ -95,8 +97,9 @@ public function handle(): void
$this->import->clearMediaCollection('failed-chunks');

if (!$hasFailedRows) {
Log::debug('[CollateFailedChunks] No failed rows found', [
Log::info('[CollateFailedChunks] No failed rows found', [
'import_id' => $this->import->id,
'processor' => $this->import->processor,
]);
unlink($failedImportsFilePath);
return;
Expand All @@ -105,8 +108,9 @@ public function handle(): void
$this->import->addMedia($failedImportsFilePath)
->toMediaCollection('failed', config('nova-data-sync.imports.disk'));

Log::debug('[CollateFailedChunks] Finished processing failed chunks', [
Log::info('[CollateFailedChunks] Finished processing failed chunks', [
'import_id' => $this->import->id,
'processor' => $this->import->processor,
]);
}
}
61 changes: 45 additions & 16 deletions src/Import/Jobs/ImportProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public function __construct(
)
{
$this->queue = config('nova-data-sync.imports.queue', 'default');
$this->className = self::class;
Log::debug('[' . $this->className . '] Initialized');
$this->className = static::class;
}

abstract public static function expectedHeaders(): array;
Expand Down Expand Up @@ -70,7 +69,9 @@ public function handle(): void
return;
}

Log::debug('[' . $this->className . '] Starting import...');
Log::info("[$this->className] Import processor started", [
'import_id' => $this->import->id,
]);

// Initialize failed imports report
$this->initializeFailedImportsReport();
Expand Down Expand Up @@ -104,15 +105,7 @@ public function handle(): void
return true;
});

// Ensure any remaining increments are saved
if ($this->processedCount > 0) {
$this->import->increment('total_rows_processed', $this->processedCount);
$this->processedCount = 0;
}
if ($this->failedCount > 0) {
$this->import->increment('total_rows_failed', $this->failedCount);
$this->failedCount = 0;
}
$this->finish();

$this->failedImportsReportWriter->close();

Expand Down Expand Up @@ -185,13 +178,49 @@ protected function validateRow(array $row, int $rowIndex): bool

protected function shouldQuit(): bool
{
// Check the status of Import model to see if it is still in progress every 10 seconds,
// if not, return false. Check using cache to avoid hitting the database every time
return Cache::remember('nova-data-sync-import-' . $this->import->id . '-should-stop', now()->addSeconds(10),
// Only check every x processed rows
if ($this->processedCount % $this->rowsToProcessBeforeCheckingForQuit() !== 0) {
return false;
}

$shouldQuit = Cache::remember(
'nova-data-sync-import-' . $this->import->id . '-should-stop',
now()->addSeconds($this->secondsBeforeCheckingForQuit()),
function () {
$this->import->refresh();
return $this->import->status !== Status::IN_PROGRESS->value;
return in_array($this->import->status, [Status::STOPPING->value, Status::STOPPED->value]);
}
);

if ($shouldQuit) {
Log::info('[' . static::class . '] Stopping import processor', [
'import_id' => $this->import->id,
]);
}

return $shouldQuit;
}

protected function rowsToProcessBeforeCheckingForQuit(): int
{
return 10;
}

protected function secondsBeforeCheckingForQuit(): int
{
return 10;
}

protected function finish(): void
{
// Ensure any remaining increments are saved
if ($this->processedCount > 0) {
$this->import->increment('total_rows_processed', $this->processedCount);
$this->processedCount = 0;
}
if ($this->failedCount > 0) {
$this->import->increment('total_rows_failed', $this->failedCount);
$this->failedCount = 0;
}
}
}
4 changes: 2 additions & 2 deletions src/Import/Nova/Actions/ImportStopAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public function handle(ActionFields $fields, Collection $models): ActionResponse
$import = $models->first();

$import->update([
'status' => Status::STOPPED->value,
'status' => Status::STOPPING->value,
]);

return Action::message('Import has been stopped.');
return Action::message('Attempt to stop the import started..');
}
}
7 changes: 6 additions & 1 deletion src/Import/Nova/Import.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public function fields(NovaRequest $request): array
Status::make('Status')
->loadingWhen([StatusEnum::PENDING->value, StatusEnum::IN_PROGRESS->value])
->failedWhen([
StatusEnum::STOPPING->value,
StatusEnum::STOPPED->value,
StatusEnum::FAILED->value,
]),
Expand Down Expand Up @@ -190,7 +191,11 @@ public function authorizedToUpdate(Request $request): bool
*/
public function authorizedToDelete(Request $request): bool
{
return false;
return in_array($this->status, [
StatusEnum::COMPLETED->value,
StatusEnum::STOPPED->value,
StatusEnum::FAILED->value,
]);
}

/**
Expand Down

0 comments on commit ec11da5

Please sign in to comment.