Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrency] Exception when processing chunked tasks concurrently #53645

Open
algoyounes opened this issue Nov 24, 2024 · 0 comments
Open

[Concurrency] Exception when processing chunked tasks concurrently #53645

algoyounes opened this issue Nov 24, 2024 · 0 comments

Comments

@algoyounes
Copy link

Laravel Version

11.33.2

PHP Version

8.2

Database Driver & Version

MySQL 8.3.0

Description

When attempting to process tasks concurrently by chunking an array of data using ConcurrencyManager, you will get an exception thrown. the tasks are not executed as expected, and the exception prevents the process from completing.

Expected Behaviour :

Given an array of data chunked and processed concurrently, the tasks should execute without errors and return the expected results.

$data = [1, 2, 3, 4];
$chunks = array_chunk($data, 2);
$tasks = array_map(fn($chunk) => fn() => $chunk, $chunks);

[$a, $b] = app(ConcurrencyManager::class)->run($tasks);

// Expected results
$a = [1, 2];
$b = [3, 4];

Actual Behaviour :

When executing the above code, exceptions occur in the following scenarios.

  1. Using Closures:

An exception is thrown when attempting to process closures that return the chunked data.

at vendor/laravel/framework/src/Illuminate/Concurrency/ProcessDriver.php:45
   41▕         return $results->collect()->map(function ($result) {
   42▕             $result = json_decode($result->output(), true);
   43▕
   44▕             if (! $result['successful']) {
➜  45▕                 throw new $result['exception'](
   46▕                     $result['message']
   47▕                 );
   48▕             }
   49▕
  1. Using Direct Values:

When trying to process tasks by passing direct values instead of closures:

$tasks = array_map(
    fn($chunk) => $chunk,
    array_chunk([1, 2, 3, 4], 2)
);

[$a, $b] = app(ConcurrencyManager::class)->run($tasks);

An exception is thrown:

Laravel\SerializableClosure\SerializableClosure::__construct(): Argument #1 ($closure) must be of type Closure, array given, called in /vendor/laravel/framework/src/Illuminate/Concurrency/ProcessDriver.php on line 36.

Behaviour Summary :

the behavior varies depending on the concurrency driver used (process or sync) and how tasks are defined.

The results are summarized in the following table :

Task\Driver Process Sync
Closures ❌ Throws Exception ✅ Works
Direct Values ❌ Throws Exception ❌ Throws Exception

Steps To Reproduce

  1. Set the default concurrency driver in config/concurrency.php to process (or sync for testing the ‘Direct Values’ case) :
return [
    'default' => 'process', // or 'sync' for 'Direct Values' case 
];
  1. Test Case 1 : Using Closures
$tasks = array_map(
    fn($chunk) => fn() => $chunk,
    array_chunk([1, 2, 3, 4], 2)
);

[$a, $b] = app(ConcurrencyManager::class)->run($tasks);

Result :

at vendor/laravel/framework/src/Illuminate/Concurrency/ProcessDriver.php:45
   41▕         return $results->collect()->map(function ($result) {
   42▕             $result = json_decode($result->output(), true);
   43▕
   44▕             if (! $result['successful']) {
➜  45▕                 throw new $result['exception'](
   46▕                     $result['message']
   47▕                 );
   48▕             }
   49▕
  1. Test Case 2 : Using Direct Values
$tasks = array_map(
    fn($chunk) => $chunk,
    array_chunk([1, 2, 3, 4], 2)
);

[$a, $b] = app(ConcurrencyManager::class)->run($tasks);

Result :

Using process driver :

Laravel\SerializableClosure\SerializableClosure::__construct(): Argument #1 ($closure) must be of type Closure, array given, called in /vendor/laravel/framework/src/Illuminate/Concurrency/ProcessDriver.php on line 36

Using sync driver :

  First array member is not a valid class name or object

  at vendor/laravel/framework/src/Illuminate/Concurrency/SyncDriver.php:20
     16▕      */
     17▕     public function run(Closure|array $tasks): array
     18▕     {
     19▕         return collect(Arr::wrap($tasks))->map(
  ➜  20▕             fn ($task) => $task()
     21▕         )->all();
     22▕     }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant