Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhooks GA #1358

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8fb0e78
Webhook delivery failure notification
devkiran Oct 10, 2024
f9afddc
fix the build
devkiran Oct 10, 2024
891b92a
enable and disable the webhook
devkiran Oct 10, 2024
8d815ea
send only active webhooks
devkiran Oct 10, 2024
b823c6b
create secret if not passed
devkiran Oct 10, 2024
0db3efc
add webhooks.* to scopes
devkiran Oct 10, 2024
82c417a
hide the zapier webhooks from the UI
devkiran Oct 10, 2024
e578fa1
add webhooks.read and write to scopes
devkiran Oct 11, 2024
5515d41
identify webhook receiver from url
devkiran Oct 11, 2024
883daea
add Slack webhook (wip)
devkiran Oct 11, 2024
26caf8f
Merge branch 'main' into webhooks-ga
devkiran Oct 11, 2024
9762ddf
slack template
devkiran Oct 11, 2024
2e0b119
Merge branch 'main' into webhooks-ga
devkiran Oct 13, 2024
eb8778f
Merge branch 'main' into webhooks-ga
devkiran Oct 14, 2024
d54de01
disabled -> disabledAt
devkiran Oct 14, 2024
fdba44b
improve handling disabled webhooks
devkiran Oct 14, 2024
e681d53
if a workspace exceeds the number of clicks, we should stop sending c…
devkiran Oct 14, 2024
fe5676f
add a link to webhook settings
devkiran Oct 14, 2024
40fcffe
Merge branch 'main' into webhooks-ga
devkiran Oct 14, 2024
d65e9eb
remove the slack webhook when uninstall
devkiran Oct 14, 2024
0020cf9
Merge branch 'main' into webhooks-ga
devkiran Oct 14, 2024
d04d7e3
Merge branch 'main' into webhooks-ga
devkiran Oct 14, 2024
c5c7619
Merge branch 'main' into webhooks-ga
devkiran Oct 15, 2024
42e8015
improve the test sending webhooks
devkiran Oct 15, 2024
c66ad45
update slack template
devkiran Oct 15, 2024
e70926c
add context
devkiran Oct 15, 2024
c75c5e0
update
devkiran Oct 15, 2024
6bc08a3
Merge branch 'main' into webhooks-ga
devkiran Oct 17, 2024
bd0246b
Merge branch 'main' into webhooks-ga
steven-tey Oct 20, 2024
e9e4da9
Merge branch 'main' into webhooks-ga
devkiran Nov 12, 2024
d947fb0
a small fix
devkiran Nov 12, 2024
35fc2ad
Merge branch 'main' into webhooks-ga
devkiran Nov 13, 2024
93316ec
Merge branch 'main' into webhooks-ga
devkiran Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion apps/web/app/api/slack/callback/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { getSlackEnv } from "@/lib/integrations/slack/env";
import { SlackCredential } from "@/lib/integrations/slack/type";
import { prisma } from "@/lib/prisma";
import { redis } from "@/lib/upstash";
import { addWebhook } from "@/lib/webhook/api";
import z from "@/lib/zod";
import {
APP_DOMAIN_WITH_NGROK,
Expand All @@ -24,7 +25,7 @@ const oAuthCallbackSchema = z.object({
export const GET = async (req: Request) => {
const env = getSlackEnv();

let workspace: Pick<Project, "slug"> | null = null;
let workspace: Pick<Project, "id" | "slug"> | null = null;

try {
const session = await getSession();
Expand Down Expand Up @@ -53,6 +54,7 @@ export const GET = async (req: Request) => {
id: workspaceId,
},
select: {
id: true,
slug: true,
},
});
Expand All @@ -73,6 +75,13 @@ export const GET = async (req: Request) => {

const data = await response.json();

const webhook = await addWebhook({
name: "Slack",
url: data.incoming_webhook.url,
triggers: ["link.created"],
workspace,
});

const credentials: SlackCredential = {
appId: data.app_id,
botUserId: data.bot_user_id,
Expand All @@ -81,6 +90,11 @@ export const GET = async (req: Request) => {
tokenType: data.token_type,
authUser: data.authed_user,
team: data.team,
incomingWebhook: {
channel: data.incoming_webhook.channel,
channelId: data.incoming_webhook.channel_id,
webhookId: webhook.id,
},
};

await installIntegration({
Expand Down
19 changes: 19 additions & 0 deletions apps/web/app/api/stripe/webhook/customer-subscription-deleted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,24 @@ export async function customerSubscriptionDeleted(event: Stripe.Event) {
sendCancellationFeedback({
owners: workspaceUsers,
}),

// Disable the webhooks
prisma.webhook.updateMany({
where: {
projectId: workspace.id,
},
data: {
disabledAt: new Date(),
},
}),

prisma.project.update({
where: {
id: workspace.id,
},
data: {
webhookEnabled: false,
},
}),
]);
}
21 changes: 21 additions & 0 deletions apps/web/app/api/stripe/webhook/customer-subscription-updated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export async function customerSubscriptionUpdated(event: Stripe.Event) {
}

const newPlan = plan.name.toLowerCase();
const shouldDisableWebhooks = newPlan === "free" || newPlan === "pro";

// If a workspace upgrades/downgrades their subscription, update their usage limit in the database.
if (workspace.plan !== newPlan) {
Expand All @@ -78,6 +79,7 @@ export async function customerSubscriptionUpdated(event: Stripe.Event) {
paymentFailedAt: null,
},
}),

prisma.restrictedToken.updateMany({
where: {
projectId: workspace.id,
Expand All @@ -86,6 +88,25 @@ export async function customerSubscriptionUpdated(event: Stripe.Event) {
rateLimit: plan.limits.api,
},
}),

// Disable the webhooks if the new plan does not support webhooks
...(shouldDisableWebhooks
? [
prisma.webhook.updateMany({
where: { projectId: workspace.id },
data: { disabledAt: new Date() },
}),

prisma.project.update({
where: {
id: workspace.id,
},
data: {
webhookEnabled: false,
},
}),
]
: []),
]);
} else if (workspace.paymentFailedAt) {
await prisma.project.update({
Expand Down
1 change: 1 addition & 0 deletions apps/web/app/api/track/click/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export const POST = async (req: Request) => {
linkId: link.id,
url: link.url,
skipRatelimit: true,
workspaceId: workspace.id,
}),
);
}
Expand Down
50 changes: 19 additions & 31 deletions apps/web/app/api/webhooks/[webhookId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { linkCache } from "@/lib/api/links/cache";
import { parseRequestBody } from "@/lib/api/utils";
import { withWorkspace } from "@/lib/auth";
import { prisma } from "@/lib/prisma";
import { updateWebhookStatusForWorkspace } from "@/lib/webhook/api";
import { webhookCache } from "@/lib/webhook/cache";
import { transformWebhook } from "@/lib/webhook/transform";
import { isLinkLevelWebhook } from "@/lib/webhook/utils";
Expand All @@ -26,6 +27,7 @@ export const GET = withWorkspace(
url: true,
secret: true,
triggers: true,
disabledAt: true,
links: true,
},
});
Expand Down Expand Up @@ -54,21 +56,23 @@ export const PATCH = withWorkspace(
await parseRequestBody(req),
);

const webhookUrlExists = await prisma.webhook.findFirst({
where: {
projectId: workspace.id,
url,
id: {
not: webhookId,
if (url) {
const webhookUrlExists = await prisma.webhook.findFirst({
where: {
projectId: workspace.id,
url,
id: {
not: webhookId,
},
},
},
});

if (webhookUrlExists) {
throw new DubApiError({
code: "conflict",
message: "A Webhook with this URL already exists.",
});

if (webhookUrlExists) {
throw new DubApiError({
code: "conflict",
message: "A Webhook with this URL already exists.",
});
}
}

if (linkIds && linkIds.length > 0) {
Expand Down Expand Up @@ -124,6 +128,7 @@ export const PATCH = withWorkspace(
url: true,
secret: true,
triggers: true,
disabledAt: true,
links: {
select: {
linkId: true,
Expand Down Expand Up @@ -258,24 +263,6 @@ export const DELETE = withWorkspace(
},
});

const webhooksCount = await prisma.webhook.count({
where: {
projectId: workspace.id,
},
});

// Disable webhooks for the workspace if there are no more webhooks
if (webhooksCount === 0) {
await prisma.project.update({
where: {
id: workspace.id,
},
data: {
webhookEnabled: false,
},
});
}

waitUntil(
(async () => {
const links = await prisma.link.findMany({
Expand All @@ -299,6 +286,7 @@ export const DELETE = withWorkspace(
});

await Promise.all([
updateWebhookStatusForWorkspace({ workspace }),
linkCache.mset(formatedLinks),
webhookCache.delete(webhookId),
]);
Expand Down
63 changes: 47 additions & 16 deletions apps/web/app/api/webhooks/callback/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import { verifyQstashSignature } from "@/lib/cron/verify-qstash";
import { prisma } from "@/lib/prisma";
import { recordWebhookEvent } from "@/lib/tinybird/record-webhook-event";
import { webhookPayloadSchema } from "@/lib/webhook/schemas";
import { WEBHOOK_TRIGGERS } from "@/lib/webhook/constants";
import {
handleWebhookFailure,
resetWebhookFailureCount,
} from "@/lib/webhook/failure";
import { webhookCallbackSchema } from "@/lib/zod/schemas/webhooks";
import { getSearchParams } from "@dub/utils";
import { z } from "zod";

const searchParamsSchema = z.object({
webhookId: z.string(),
eventId: z.string(),
event: z.enum(WEBHOOK_TRIGGERS),
});

// POST /api/webhooks/callback – listen to webhooks status from QStash
export const POST = async (req: Request) => {
Expand All @@ -13,25 +25,44 @@ export const POST = async (req: Request) => {
const { url, status, body, sourceBody, sourceMessageId } =
webhookCallbackSchema.parse(rawBody);

const { webhookId, eventId, event } = searchParamsSchema.parse(
getSearchParams(req.url),
);

const webhook = await prisma.webhook.findUnique({
where: { id: webhookId },
});

if (!webhook) {
console.error("Webhook not found", { webhookId });
return new Response("Webhook not found");
}

const request = Buffer.from(sourceBody, "base64").toString("utf-8");
const response = Buffer.from(body, "base64").toString("utf-8");
const isFailed = status >= 400;

const { id: eventId, event } = webhookPayloadSchema.parse(
JSON.parse(request),
);
await Promise.all([
// Record the webhook event
recordWebhookEvent({
url,
event,
event_id: eventId,
http_status: status,
webhook_id: webhookId,
request_body: request,
response_body: response,
message_id: sourceMessageId,
}),

const { webhookId } = getSearchParams(req.url);

await recordWebhookEvent({
url,
event,
event_id: eventId,
http_status: status,
webhook_id: webhookId,
request_body: request,
response_body: response,
message_id: sourceMessageId,
});
// Handle the webhook delivery failure if it's the last retry
...(isFailed ? [handleWebhookFailure(webhookId)] : []),

// Only reset if there were previous failures
...(webhook.consecutiveFailures > 0 && !isFailed
? [resetWebhookFailureCount(webhookId)]
: []),
]);

return new Response("OK");
};
Loading
Loading