Skip to content

Commit

Permalink
Merge pull request #29 from ahfeel/feature/better-concurrency
Browse files Browse the repository at this point in the history
Better concurrency system and memory management
  • Loading branch information
edamov authored Nov 26, 2017
2 parents c53713c + ccc47b3 commit e660421
Showing 1 changed file with 131 additions and 32 deletions.
163 changes: 131 additions & 32 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ class Client
*/
private $isProductionEnv;

/**
* Number of concurrent requests to multiplex in the same connection.
*
* @var int
*/
private $nbConcurrentRequests = 20;

/**
* Number of maximum concurrent connections established to the APNS servers.
*
* @var int
*/
private $maxConcurrentConnections = 1;

/**
* Flag to know if we should automatically close connections to the APNS servers or keep them alive.
*
* @var bool
*/
private $autoCloseConnections = true;

/**
* Current curl_multi handle instance.
*
* @var Object
*/
private $curlMultiHandle;

/**
* Client constructor.
*
Expand All @@ -57,54 +85,82 @@ public function __construct(AuthProviderInterface $authProvider, bool $isProduct
*/
public function push(): array
{
$mh = curl_multi_init();

if (!defined('CURLPIPE_MULTIPLEX')) {
define('CURLPIPE_MULTIPLEX', 2);
}
if (!$this->curlMultiHandle) {
$this->curlMultiHandle = curl_multi_init();

curl_multi_setopt($mh, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
if (!defined('CURLPIPE_MULTIPLEX')) {
define('CURLPIPE_MULTIPLEX', 2);
}

$handles = [];
foreach ($this->notifications as $k => $notification) {
$request = new Request($notification, $this->isProductionEnv);
$handles[] = $ch = curl_init();
curl_multi_setopt($this->curlMultiHandle, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
curl_multi_setopt($this->curlMultiHandle, CURLMOPT_MAX_HOST_CONNECTIONS, $this->maxConcurrentConnections);
}

$this->authProvider->authenticateClient($request);
$mh = $this->curlMultiHandle;

curl_setopt_array($ch, $request->getOptions());
curl_setopt($ch, CURLOPT_HTTPHEADER, $request->getDecoratedHeaders());
$i = 0;
while (!empty($this->notifications) && $i++ < $this->nbConcurrentRequests) {
$notification = array_pop($this->notifications);
curl_multi_add_handle($mh, $this->prepareHandle($notification));
}

$handleChunks = array_chunk($handles, 10);
foreach ($handleChunks as $handleChunk) {
foreach ($handleChunk as $handle) {
curl_multi_add_handle($mh, $handle);
do {
while (($execrun = curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);

if ($execrun != CURLM_OK) {
break;
}

$running = null;
do {
while(($execrun = curl_multi_exec($mh, $running)) == CURLM_CALL_MULTI_PERFORM);
while ($done = curl_multi_info_read($mh)) {
$handle = $done['handle'];

while($done = curl_multi_info_read($mh)) {
$handle = $done['handle'];
$result = curl_multi_getcontent($handle);

$result = curl_multi_getcontent($handle);
$token = curl_getinfo($handle, CURLINFO_PRIVATE);
// find out which token the response is about
$token = curl_getinfo($handle, CURLINFO_PRIVATE);

list($headers, $body) = explode("\r\n\r\n", $result, 2);
$statusCode = curl_getinfo($handle, CURLINFO_HTTP_CODE);
$responseCollection[] = new Response($statusCode, $headers, $body, $token);
curl_multi_remove_handle($mh, $handle);
list($headers, $body) = explode("\r\n\r\n", $result, 2);
$statusCode = curl_getinfo($handle, CURLINFO_HTTP_CODE);
$responseCollection[] = new Response($statusCode, $headers, $body, $token);
curl_multi_remove_handle($mh, $handle);
curl_close($handle);

if (!empty($this->notifications)) {
$notification = array_pop($this->notifications);
curl_multi_add_handle($mh, $this->prepareHandle($notification));
}
} while ($running);
}
}
} while ($running);

curl_multi_close($mh);
if ($this->autoCloseConnections) {
curl_multi_close($mh);
$this->curlMultiHandle = null;
}

return $responseCollection;
}

/**
* Prepares a curl handle from a Notification object.
*
* @param Notification $notification
*/
private function prepareHandle(Notification $notification)
{
$request = new Request($notification, $this->isProductionEnv);
$ch = curl_init();

$this->authProvider->authenticateClient($request);

curl_setopt_array($ch, $request->getOptions());
curl_setopt($ch, CURLOPT_HTTPHEADER, $request->getDecoratedHeaders());

// store device token to identify response
curl_setopt($ch, CURLOPT_PRIVATE, $notification->getDeviceToken());

return $ch;
}

/**
* Add notification in queue for sending.
*
Expand Down Expand Up @@ -136,8 +192,51 @@ public function addNotifications(array $notifications)
*
* @return Notification[]
*/
public function getNotifications()
public function getNotifications(): array
{
return $this->notifications;
}

/**
* Close the current curl multi handle.
*/
public function close()
{
if ($this->curlMultiHandle) {
curl_multi_close($this->curlMultiHandle);
$this->curlMultiHandle = null;
}
}

/**
* Set the number of concurrent requests sent through the multiplexed connections.
*
* @param int $nbConcurrentRequests
*/
public function setNbConcurrentRequests($nbConcurrentRequests)
{
$this->nbConcurrentRequests = $nbConcurrentRequests;
}


/**
* Set the number of maximum concurrent connections established to the APNS servers.
*
* @param int $nbConcurrentRequests
*/
public function setMaxConcurrentConnections($maxConcurrentConnections)
{
$this->maxConcurrentConnections = $maxConcurrentConnections;
}

/**
* Set wether or not the client should automatically close the connections. Apple recommends keeping
* connections open if you send more than a few notification per minutes.
*
* @param bool $nbConcurrentRequests
*/
public function setAutoCloseConnections($autoCloseConnections)
{
$this->autoCloseConnections = $autoCloseConnections;
}
}

0 comments on commit e660421

Please sign in to comment.