-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f2972ec
commit a6098e5
Showing
1 changed file
with
125 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
<?php | ||
|
||
namespace GuzzleHttp; | ||
|
||
use GuzzleHttp\Promise as P; | ||
use GuzzleHttp\Promise\EachPromise; | ||
use GuzzleHttp\Promise\PromiseInterface; | ||
use GuzzleHttp\Promise\PromisorInterface; | ||
use Psr\Http\Message\RequestInterface; | ||
|
||
/** | ||
* Sends an iterator of requests concurrently using a capped pool size. | ||
* | ||
* The pool will read from an iterator until it is cancelled or until the | ||
* iterator is consumed. When a request is yielded, the request is sent after | ||
* applying the "request_options" request options (if provided in the ctor). | ||
* | ||
* When a function is yielded by the iterator, the function is provided the | ||
* "request_options" array that should be merged on top of any existing | ||
* options, and the function MUST then return a wait-able promise. | ||
* | ||
* @final | ||
*/ | ||
class Pool implements PromisorInterface | ||
{ | ||
/** | ||
* @var EachPromise | ||
*/ | ||
private $each; | ||
|
||
/** | ||
* @param ClientInterface $client Client used to send the requests. | ||
* @param array|\Iterator $requests Requests or functions that return | ||
* requests to send concurrently. | ||
* @param array $config Associative array of options | ||
* - concurrency: (int) Maximum number of requests to send concurrently | ||
* - options: Array of request options to apply to each request. | ||
* - fulfilled: (callable) Function to invoke when a request completes. | ||
* - rejected: (callable) Function to invoke when a request is rejected. | ||
*/ | ||
public function __construct(ClientInterface $client, $requests, array $config = []) | ||
{ | ||
if (!isset($config['concurrency'])) { | ||
$config['concurrency'] = 25; | ||
} | ||
|
||
if (isset($config['options'])) { | ||
$opts = $config['options']; | ||
unset($config['options']); | ||
} else { | ||
$opts = []; | ||
} | ||
|
||
$iterable = P\Create::iterFor($requests); | ||
$requests = static function () use ($iterable, $client, $opts) { | ||
foreach ($iterable as $key => $rfn) { | ||
if ($rfn instanceof RequestInterface) { | ||
yield $key => $client->sendAsync($rfn, $opts); | ||
} elseif (\is_callable($rfn)) { | ||
yield $key => $rfn($opts); | ||
} else { | ||
throw new \InvalidArgumentException('Each value yielded by the iterator must be a Psr7\Http\Message\RequestInterface or a callable that returns a promise that fulfills with a Psr7\Message\Http\ResponseInterface object.'); | ||
} | ||
} | ||
}; | ||
|
||
$this->each = new EachPromise($requests(), $config); | ||
} | ||
|
||
/** | ||
* Get promise | ||
*/ | ||
public function promise(): PromiseInterface | ||
{ | ||
return $this->each->promise(); | ||
} | ||
|
||
/** | ||
* Sends multiple requests concurrently and returns an array of responses | ||
* and exceptions that uses the same ordering as the provided requests. | ||
* | ||
* IMPORTANT: This method keeps every request and response in memory, and | ||
* as such, is NOT recommended when sending a large number or an | ||
* indeterminate number of requests concurrently. | ||
* | ||
* @param ClientInterface $client Client used to send the requests | ||
* @param array|\Iterator $requests Requests to send concurrently. | ||
* @param array $options Passes through the options available in | ||
* {@see \GuzzleHttp\Pool::__construct} | ||
* | ||
* @return array Returns an array containing the response or an exception | ||
* in the same order that the requests were sent. | ||
* | ||
* @throws \InvalidArgumentException if the event format is incorrect. | ||
*/ | ||
public static function batch(ClientInterface $client, $requests, array $options = []): array | ||
{ | ||
$res = []; | ||
self::cmpCallback($options, 'fulfilled', $res); | ||
self::cmpCallback($options, 'rejected', $res); | ||
$pool = new static($client, $requests, $options); | ||
$pool->promise()->wait(); | ||
\ksort($res); | ||
|
||
return $res; | ||
} | ||
|
||
/** | ||
* Execute callback(s) | ||
*/ | ||
private static function cmpCallback(array &$options, string $name, array &$results): void | ||
{ | ||
if (!isset($options[$name])) { | ||
$options[$name] = static function ($v, $k) use (&$results) { | ||
$results[$k] = $v; | ||
}; | ||
} else { | ||
$currentFn = $options[$name]; | ||
$options[$name] = static function ($v, $k) use (&$results, $currentFn) { | ||
$currentFn($v, $k); | ||
$results[$k] = $v; | ||
}; | ||
} | ||
} | ||
} |