Skip to content

Commit

Permalink
* Better concurrency system and memory management by unstacking notif…
Browse files Browse the repository at this point in the history
…ications. Based on work of ECOMDevelopment:bugfix/connection-limit
  • Loading branch information
ahfeel committed Nov 7, 2017
1 parent c559f85 commit 0bf44ff
Showing 1 changed file with 50 additions and 28 deletions.
78 changes: 50 additions & 28 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,24 @@ class Client
*/
private $isProductionEnv;

/**
* Number of concurrent requests to multiplex in the same connection
*
* @var bool
*/
private $nbConcurrentRequests;

/**
* Client constructor.
*
* @param AuthProviderInterface $authProvider
* @param bool $isProductionEnv
*/
public function __construct(AuthProviderInterface $authProvider, bool $isProductionEnv = false)
public function __construct(AuthProviderInterface $authProvider, bool $isProductionEnv = false, int $nbConcurrentRequests = 10)
{
$this->authProvider = $authProvider;
$this->isProductionEnv = $isProductionEnv;
$this->nbConcurrentRequests = $nbConcurrentRequests;
}

/**
Expand All @@ -65,46 +73,60 @@ public function push(): array

curl_multi_setopt($mh, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);

$handles = [];
foreach ($this->notifications as $k => $notification) {
$request = new Request($notification, $this->isProductionEnv);
$handles[] = $ch = curl_init();

$this->authProvider->authenticateClient($request);

curl_setopt_array($ch, $request->getOptions());
curl_setopt($ch, CURLOPT_HTTPHEADER, $request->getDecoratedHeaders());
$i = 0;
while (!empty($this->notifications) && $i++ < $this->nbConcurrentRequests) {
$notification = array_pop($this->notifications);
curl_multi_add_handle($mh, $this->prepareHandle($notification));
}

$handleChunks = array_chunk($handles, 10);
foreach ($handleChunks as $handleChunk) {
foreach ($handleChunk as $handle) {
curl_multi_add_handle($mh, $handle);
do {
while (($execrun = curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);

if ($execrun != CURLM_OK) {
break;
}

$running = null;
do {
while(($execrun = curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);
while ($done = curl_multi_info_read($mh)) {
$handle = $done['handle'];

$result = curl_multi_getcontent($handle);

while($done = curl_multi_info_read($mh)) {
$handle = $done['handle'];
// find out which token the response is about
$token = curl_getinfo($handle, CURLINFO_PRIVATE);

$result = curl_multi_getcontent($handle);
$token = curl_getinfo($handle, CURLINFO_PRIVATE);
list($headers, $body) = explode("\r\n\r\n", $result, 2);
$statusCode = curl_getinfo($handle, CURLINFO_HTTP_CODE);
$responseCollection[] = new Response($statusCode, $headers, $body, $token);
curl_multi_remove_handle($mh, $handle);

list($headers, $body) = explode("\r\n\r\n", $result, 2);
$statusCode = curl_getinfo($handle, CURLINFO_HTTP_CODE);
$responseCollection[] = new Response($statusCode, $headers, $body, $token);
curl_multi_remove_handle($mh, $handle);
if (!empty($this->notifications)) {
$notification = array_pop($this->notifications);
curl_multi_add_handle($mh, $this->prepareHandle($notification));
}
} while ($running);
}
}
} while ($running);

curl_multi_close($mh);

return $responseCollection;
}

private function prepareHandle(Notification $notification)
{
$request = new Request($notification, $this->isProductionEnv);
$ch = curl_init();

$this->authProvider->authenticateClient($request);

curl_setopt_array($ch, $request->getOptions());
curl_setopt($ch, CURLOPT_HTTPHEADER, $request->getDecoratedHeaders());

// store device token to identify response
curl_setopt($ch, CURLOPT_PRIVATE, $notification->getDeviceToken());

return $ch;
}

/**
* Add notification in queue for sending.
*
Expand Down Expand Up @@ -136,7 +158,7 @@ public function addNotifications(array $notifications)
*
* @return Notification[]
*/
public function getNotifications()
public function getNotifications(): array
{
return $this->notifications;
}
Expand Down

0 comments on commit 0bf44ff

Please sign in to comment.