Skip to content

Commit

Permalink
Merge pull request #26 from utopia-php/worker-id-inject
Browse files Browse the repository at this point in the history
Inject workerId as resource, record received timestamp
  • Loading branch information
basert authored Nov 11, 2024
2 parents 94c240d + 869f997 commit 40fdd97
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
1 change: 0 additions & 1 deletion src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

class RedisCluster implements Connection
{

protected array $seeds;
protected ?\RedisCluster $redis = null;

Expand Down
13 changes: 13 additions & 0 deletions src/Queue/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Message
protected string $queue;
protected int $timestamp;
protected array $payload;
protected float $receivedTimestamp;

public function __construct(array $array = [])
{
Expand All @@ -19,6 +20,7 @@ public function __construct(array $array = [])
$this->queue = $array['queue'];
$this->timestamp = $array['timestamp'];
$this->payload = $array['payload'] ?? [];
$this->receivedTimestamp = (float)$array['receivedTimestamp'] ?? 0.;
}

public function setPid(string $pid): self
Expand Down Expand Up @@ -69,6 +71,17 @@ public function getPayload(): array
return $this->payload;
}

/**
* Timestamp recorded when this message was received by the worker.
* The delta between `$receivedTimestamp` and `$timestamp` is the amount of time the message was waiting in the queue.
*
* @return float
*/
public function getReceivedTimestamp(): float
{
return $this->receivedTimestamp;
}

public function asArray(): array
{
return [
Expand Down
2 changes: 2 additions & 0 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public function start(): self
try {
$this->adapter->workerStart(function (string $workerId) {
Console::success("[Worker] Worker {$workerId} is ready!");
self::setResource('workerId', fn () => $workerId);
if (!is_null($this->workerStartHook)) {
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
}
Expand All @@ -203,6 +204,7 @@ public function start(): self
}

$nextMessage['timestamp'] = (int)$nextMessage['timestamp'];
$nextMessage['receivedTimestamp'] = microtime(true);

$message = new Message($nextMessage);

Expand Down
1 change: 0 additions & 1 deletion tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Tests\E2E\Adapter\Base;
use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;
use Utopia\Queue\Connection\RedisCluster;

class SwooleRedisClusterTest extends Base
Expand Down

0 comments on commit 40fdd97

Please sign in to comment.