Skip to content

Commit

Permalink
⚡ move webhooks to own queue (#2250)
Browse files Browse the repository at this point in the history
Co-authored-by: Kris <[email protected]>
  • Loading branch information
jeyemwey and MrKrisKrisu authored Dec 31, 2023
1 parent 058db74 commit 5ce7db2
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 36 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ php artisan passport:install
Use your webserver of choice or the in php included dev server (`php artisan serve`) to boot the application.
You should see the Träwelling homepage at http://localhost:8000.

Additionally, for continuous functionality:

- Create a cron job to run `php artisan schedule:run` every minute.
- Set up a service initiating with `php artisan queue:work` to handle essential background tasks.
Consider creating separate services for the default and webhooks queue if this is a larger installation.

### Option 3: Local Development using [Nix](https://nixos.org/)

Nix is a cross-platform package manager for Linux and macOS systems.
Expand Down
1 change: 0 additions & 1 deletion app/Http/Controllers/Backend/WebhookController.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use App\Models\WebhookCreationRequest;
use App\Models\WebhookEvent;
use Carbon\Carbon;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Notifications\DatabaseNotification;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Gate;
Expand Down
11 changes: 11 additions & 0 deletions app/Jobs/MonitoredCallWebhookJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace App\Jobs;

use romanzipp\QueueMonitor\Traits\IsMonitored;
use Spatie\WebhookServer\CallWebhookJob;

class MonitoredCallWebhookJob extends CallWebhookJob
{
use IsMonitored;
}
36 changes: 26 additions & 10 deletions app/Providers/PrometheusServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use romanzipp\QueueMonitor\Enums\MonitorStatus;
use Spatie\Prometheus\Facades\Prometheus;

const PROM_JOB_SCRAPER_SEPARATOR = "-PROM-JOB-SCRAPER-SEPARATOR-";
class PrometheusServiceProvider extends ServiceProvider
{
public function register() {
Expand Down Expand Up @@ -56,31 +57,31 @@ public function register() {

Prometheus::addGauge("queue_size")
->helpText("How many items are currently in the job queue?")
->label("job_name")
->labels(["job_name", "queue"])
->value(function() {
if (config("queue.default") === "database") {
return $this->getJobsByDisplayName("jobs");
}

return [Queue::size(), ["all"]];
return [Queue::size(), ["all", "all"]];
});

Prometheus::addGauge("failed_jobs_count")
->helpText("How many jobs have failed?")
->label("job_name")
->labels(["job_name", "queue"])
->value(function() {
return $this->getJobsByDisplayName("failed_jobs");
});

Prometheus::addGauge("completed_jobs_count")
->helpText("How many jobs are done? Old items from queue monitor table are deleted after 7 days.")
->labels(["job_name", "status"])
->labels(["job_name", "status", "queue"])
->value(function() {
return DB::table("queue_monitor")
->groupBy("name", "status")
->selectRaw("count(*) AS total, name, status")
->groupBy("name", "status", "queue")
->selectRaw("count(*) AS total, name, status, queue")
->get()
->map(fn($item) => [$item->total, [$item->name, MonitorStatus::toNamedArray()[$item->status]]])
->map(fn($item) => [$item->total, [$item->name, MonitorStatus::toNamedArray()[$item->status], $item->queue]])
->toArray();
});

Expand Down Expand Up @@ -146,9 +147,24 @@ public function register() {

public static function getJobsByDisplayName($table_name): array {
$counts = DB::table($table_name)
->get("payload")
->map(fn($row) => json_decode($row->payload))
->countBy(fn($payload) => $payload->displayName)
->get(["queue", "payload"])
->map(fn($row) => [
'queue' => $row->queue,
'displayName' => json_decode($row->payload)->displayName])
->countBy(fn($job) => $job['displayName'] . PROM_JOB_SCRAPER_SEPARATOR . $job['queue'])
->toArray();

return array_map(
fn($job_properties, $count) => [$count, explode(PROM_JOB_SCRAPER_SEPARATOR, $job_properties)],
array_keys($counts),
array_values($counts)
);
}

public static function getJobsByQueue($table_name): array {
$counts = DB::table($table_name)
->get("queue")
->countBy(fn($job) => $job->queue)
->toArray();

return array_map(
Expand Down
8 changes: 4 additions & 4 deletions config/webhook-server.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<?php

use App\Jobs\MonitoredCallWebhookJob;
use Spatie\WebhookServer\BackoffStrategy\ExponentialBackoffStrategy;
use Spatie\WebhookServer\CallWebhookJob;
use Spatie\WebhookServer\Signer\DefaultSigner;

return [

/*
* The default queue that should be used to send webhook requests.
*/
'queue' => 'default',
'queue' => 'webhook',

/*
* The default queue connection that should be used to send webhook requests.
Expand Down Expand Up @@ -54,7 +54,7 @@
* If a call to a webhook takes longer that this amount of seconds
* the attempt will be considered failed.
*/
'timeout_in_seconds' => 5,
'timeout_in_seconds' => 3,

/*
* The amount of times the webhook should be called before we give up.
Expand All @@ -69,7 +69,7 @@
/*
* This class is used to dispatch webhooks on to the queue.
*/
'webhook_job' => CallWebhookJob::class,
'webhook_job' => MonitoredCallWebhookJob::class,

/*
* By default we will verify that the ssl certificate of the destination
Expand Down
6 changes: 3 additions & 3 deletions docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fi

if [ "$role" = "launch-all-at-once" ]; then

echo "Running one item of every role";
echo "Running one item of every role";
set -m # make job control work

CONTAINER_ROLE=app ./docker-entrypoint.sh &
Expand All @@ -33,7 +33,7 @@ else
elif [ "$role" = "queue" ]; then

echo "Running the queue..."
runuser -u www-data -- php artisan queue:work
runuser -u www-data -- php artisan queue:work --queue=default,webhook

elif [ "$role" = "scheduler" ]; then

Expand All @@ -47,4 +47,4 @@ else
echo "Could not match the container role \"$role\""
exit 1
fi
fi
fi
4 changes: 2 additions & 2 deletions tests/Feature/Webhooks/WebhookNotificationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
namespace Tests\Feature\Webhooks;

use App\Enum\WebhookEvent;
use App\Jobs\MonitoredCallWebhookJob;
use App\Models\User;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Bus;
use Spatie\WebhookServer\CallWebhookJob;
use Tests\TestCase;

use function PHPUnit\Framework\assertEquals;
Expand All @@ -27,7 +27,7 @@ public function testWebhookSendingOnNotification() {
$follow = $this->actingAs($alice)->post(route('follow.create'), ['follow_id' => $bob->id]);
$follow->assertStatus(201);

Bus::assertDispatched(function (CallWebhookJob $job) {
Bus::assertDispatched(function (MonitoredCallWebhookJob $job) {
assertEquals(WebhookEvent::NOTIFICATION->value, $job->payload['event']);
return true;
});
Expand Down
16 changes: 8 additions & 8 deletions tests/Feature/Webhooks/WebhookStatusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
use App\Http\Controllers\HafasController;
use App\Http\Controllers\StatusController;
use App\Http\Resources\StatusResource;
use App\Jobs\MonitoredCallWebhookJob;
use App\Models\User;
use Carbon\Carbon;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Http;
use Spatie\WebhookServer\CallWebhookJob;
use Tests\TestCase;
use function PHPUnit\Framework\assertEquals;

Expand All @@ -30,7 +30,7 @@ public function testWebhookSendingOnStatusCreation() {
$this->createWebhook($user, $client, [WebhookEvent::CHECKIN_CREATE]);
$status = $this->createStatus($user);

Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals([
'event' => WebhookEvent::CHECKIN_CREATE->value,
'status' => new StatusResource($status),
Expand All @@ -54,7 +54,7 @@ public function testWebhookSendingOnStatusBodyChange() {
'checkinVisibility' => $status['visibility']->value
]);

Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals(
WebhookEvent::CHECKIN_UPDATE->value,
$job->payload['event']
Expand All @@ -75,7 +75,7 @@ public function testWebhookSendingOnLike() {
StatusController::createLike($user, $status);

// For self-likes, a CHECKIN_UPDATE is sent, but no notification.
Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals(
WebhookEvent::CHECKIN_UPDATE->value,
$job->payload['event']
Expand All @@ -102,7 +102,7 @@ public function testWebhookSendingOnDestinationChange() {
$aachen = $trip->stopovers->where('station.ibnr', self::AACHEN_HBF['id'])->first();
TrainCheckinController::changeDestination($checkin, $aachen);

Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals(
WebhookEvent::CHECKIN_UPDATE->value,
$job->payload['event']
Expand Down Expand Up @@ -130,7 +130,7 @@ public function testWebhookSendingOnBusinessChange() {
'checkinVisibility' => $status['visibility']->value
]);

Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals(
WebhookEvent::CHECKIN_UPDATE->value,
$job->payload['event']
Expand All @@ -156,7 +156,7 @@ public function testWebhookSendingOnVisibilityChange() {
'checkinVisibility' => StatusVisibility::UNLISTED->value,
]);

Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals(
WebhookEvent::CHECKIN_UPDATE->value,
$job->payload['event']
Expand All @@ -176,7 +176,7 @@ public function testWebhookSendingOnStatusDeletion() {
$status = $this->createStatus($user);
StatusController::DeleteStatus($user, $status['id']);

Bus::assertDispatched(function(CallWebhookJob $job) use ($status) {
Bus::assertDispatched(function(MonitoredCallWebhookJob $job) use ($status) {
assertEquals(
WebhookEvent::CHECKIN_DELETE->value,
$job->payload['event']
Expand Down
18 changes: 10 additions & 8 deletions tests/Unit/Providers/PrometheusServiceProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@ public function testGetJobsByDisplayName() {
->once()
->andReturnSelf();

DB::shouldReceive("get")
->with("payload")
DB::shouldReceive('get')
->with(["queue", "payload"])
->andReturn(
Collection::make(
array_merge([
...array_fill(0, 4, (object) ["payload" => json_encode(["displayName" => "JobA"])]),
...array_fill(0, 7, (object) ["payload" => json_encode(["displayName" => "JobB"])]),
...array_fill(0, 2, (object) ["payload" => json_encode(["displayName" => "JobC"])]),
...array_fill(0, 4, (object) ["queue" => "default", "payload" => json_encode(["displayName" => "JobA"])]),
...array_fill(0, 7, (object) ["queue" => "webhook", "payload" => json_encode(["displayName" => "JobB"])]),
...array_fill(0, 2, (object) ["queue" => "default", "payload" => json_encode(["displayName" => "JobC"])]),
...array_fill(0, 5, (object) ["queue" => "webhook", "payload" => json_encode(["displayName" => "JobC"])]),
])));

$actual = PrometheusServiceProvider::getJobsByDisplayName(self::TABLENAME);

assertEquals([
[4, ["JobA"]],
[7, ["JobB"]],
[2, ["JobC"]]
[4, ["JobA", "default"]],
[7, ["JobB", "webhook"]],
[2, ["JobC", "default"]],
[5, ["JobC", "webhook"]],
], $actual);
}
}

0 comments on commit 5ce7db2

Please sign in to comment.