Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: SystemWebhook/UserWebhookの配信処理呼び出し部分の改善 #15035

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions packages/backend/src/core/AbuseReportNotificationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,22 @@ export class AbuseReportNotificationService implements OnApplicationShutdown {
};
});

const recipientWebhookIds = await this.fetchWebhookRecipients()
.then(it => it
.filter(it => it.isActive && it.systemWebhookId && it.method === 'webhook')
.map(it => it.systemWebhookId)
.filter(x => x != null));
for (const webhookId of recipientWebhookIds) {
await Promise.all(
convertedReports.map(it => {
return this.systemWebhookService.enqueueSystemWebhook(
webhookId,
type,
it,
);
}),
);
}
const inactiveRecipients = await this.fetchWebhookRecipients()
.then(it => it.filter(it => !it.isActive));
const withoutWebhookIds = inactiveRecipients
.map(it => it.systemWebhookId)
.filter(x => x != null);
return Promise.all(
convertedReports.map(it => {
return this.systemWebhookService.enqueueSystemWebhook(
type,
it,
{
excludes: withoutWebhookIds,
},
);
}),
);
}

/**
Expand Down
33 changes: 4 additions & 29 deletions packages/backend/src/core/NoteCreateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -614,14 +614,7 @@ export class NoteCreateService implements OnApplicationShutdown {

this.roleService.addNoteToRoleTimeline(noteObj);

this.webhookService.getActiveWebhooks().then(webhooks => {
webhooks = webhooks.filter(x => x.userId === user.id && x.on.includes('note'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'note', {
note: noteObj,
});
}
});
this.webhookService.enqueueUserWebhook(user.id, 'note', { note: noteObj });

const nm = new NotificationManager(this.mutingsRepository, this.notificationService, user, note);

Expand All @@ -641,13 +634,7 @@ export class NoteCreateService implements OnApplicationShutdown {
if (!isThreadMuted) {
nm.push(data.reply.userId, 'reply');
this.globalEventService.publishMainStream(data.reply.userId, 'reply', noteObj);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === data.reply!.userId && x.on.includes('reply'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'reply', {
note: noteObj,
});
}
this.webhookService.enqueueUserWebhook(data.reply.userId, 'reply', { note: noteObj });
}
}
}
Expand All @@ -664,13 +651,7 @@ export class NoteCreateService implements OnApplicationShutdown {
// Publish event
if ((user.id !== data.renote.userId) && data.renote.userHost === null) {
this.globalEventService.publishMainStream(data.renote.userId, 'renote', noteObj);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === data.renote!.userId && x.on.includes('renote'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'renote', {
note: noteObj,
});
}
this.webhookService.enqueueUserWebhook(data.renote.userId, 'renote', { note: noteObj });
}
}

Expand Down Expand Up @@ -796,13 +777,7 @@ export class NoteCreateService implements OnApplicationShutdown {
});

this.globalEventService.publishMainStream(u.id, 'mention', detailPackedNote);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === u.id && x.on.includes('mention'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'mention', {
note: detailPackedNote,
});
}
this.webhookService.enqueueUserWebhook(u.id, 'mention', { note: detailPackedNote });

// Create notification
nm.push(u.id, 'mention');
Expand Down
31 changes: 12 additions & 19 deletions packages/backend/src/core/SystemWebhookService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ export type SystemWebhookPayload<T extends SystemWebhookEventType> =

@Injectable()
export class SystemWebhookService implements OnApplicationShutdown {
private logger: Logger;
private activeSystemWebhooksFetched = false;
private activeSystemWebhooks: MiSystemWebhook[] = [];

Expand All @@ -62,11 +61,9 @@ export class SystemWebhookService implements OnApplicationShutdown {
private idService: IdService,
private queueService: QueueService,
private moderationLogService: ModerationLogService,
private loggerService: LoggerService,
private globalEventService: GlobalEventService,
) {
this.redisForSub.on('message', this.onMessage);
this.logger = this.loggerService.getLogger('webhook');
}

@bindThis
Expand Down Expand Up @@ -193,28 +190,24 @@ export class SystemWebhookService implements OnApplicationShutdown {
/**
* SystemWebhook をWebhook配送キューに追加する
* @see QueueService.systemWebhookDeliver
* // TODO: contentの型を厳格化する
*/
@bindThis
public async enqueueSystemWebhook<T extends SystemWebhookEventType>(
webhook: MiSystemWebhook | MiSystemWebhook['id'],
type: T,
content: SystemWebhookPayload<T>,
opts?: {
excludes?: MiSystemWebhook['id'][];
},
) {
const webhookEntity = typeof webhook === 'string'
? (await this.fetchActiveSystemWebhooks()).find(a => a.id === webhook)
: webhook;
if (!webhookEntity || !webhookEntity.isActive) {
this.logger.info(`SystemWebhook is not active or not found : ${webhook}`);
return;
}

if (!webhookEntity.on.includes(type)) {
this.logger.info(`SystemWebhook ${webhookEntity.id} is not listening to ${type}`);
return;
}

return this.queueService.systemWebhookDeliver(webhookEntity, type, content);
const webhooks = await this.fetchActiveSystemWebhooks()
.then(webhooks => {
return webhooks.filter(webhook => !opts?.excludes?.includes(webhook.id) && webhook.on.includes(type));
});
return Promise.all(
webhooks.map(webhook => {
return this.queueService.systemWebhookDeliver(webhook, type, content);
}),
);
}

@bindThis
Expand Down
8 changes: 1 addition & 7 deletions packages/backend/src/core/UserBlockingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,7 @@ export class UserBlockingService implements OnModuleInit {
schema: 'UserDetailedNotMe',
}).then(async packed => {
this.globalEventService.publishMainStream(follower.id, 'unfollow', packed);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'unfollow', {
user: packed,
});
}
this.webhookService.enqueueUserWebhook(follower.id, 'unfollow', { user: packed });
});
}

Expand Down
32 changes: 4 additions & 28 deletions packages/backend/src/core/UserFollowingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,27 +333,15 @@ export class UserFollowingService implements OnModuleInit {
schema: 'UserDetailedNotMe',
}).then(async packed => {
this.globalEventService.publishMainStream(follower.id, 'follow', packed);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('follow'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'follow', {
user: packed,
});
}
this.webhookService.enqueueUserWebhook(follower.id, 'follow', { user: packed });
});
}

// Publish followed event
if (this.userEntityService.isLocalUser(followee)) {
this.userEntityService.pack(follower.id, followee).then(async packed => {
this.globalEventService.publishMainStream(followee.id, 'followed', packed);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === followee.id && x.on.includes('followed'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'followed', {
user: packed,
});
}
this.webhookService.enqueueUserWebhook(followee.id, 'followed', { user: packed });
});

// 通知を作成
Expand Down Expand Up @@ -400,13 +388,7 @@ export class UserFollowingService implements OnModuleInit {
schema: 'UserDetailedNotMe',
}).then(async packed => {
this.globalEventService.publishMainStream(follower.id, 'unfollow', packed);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'unfollow', {
user: packed,
});
}
this.webhookService.enqueueUserWebhook(follower.id, 'unfollow', { user: packed });
});
}

Expand Down Expand Up @@ -744,13 +726,7 @@ export class UserFollowingService implements OnModuleInit {
});

this.globalEventService.publishMainStream(follower.id, 'unfollow', packedFollowee);

const webhooks = (await this.webhookService.getActiveWebhooks()).filter(x => x.userId === follower.id && x.on.includes('unfollow'));
for (const webhook of webhooks) {
this.queueService.userWebhookDeliver(webhook, 'unfollow', {
user: packedFollowee,
});
}
this.webhookService.enqueueUserWebhook(follower.id, 'unfollow', { user: packedFollowee });
}

@bindThis
Expand Down
9 changes: 1 addition & 8 deletions packages/backend/src/core/UserService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ export class UserService {
@bindThis
public async notifySystemWebhook(user: MiUser, type: 'userCreated') {
const packedUser = await this.userEntityService.pack(user, null, { schema: 'UserLite' });
const recipientWebhookIds = await this.systemWebhookService.fetchSystemWebhooks({ isActive: true, on: [type] });
for (const webhookId of recipientWebhookIds) {
await this.systemWebhookService.enqueueSystemWebhook(
webhookId,
type,
packedUser,
);
}
return this.systemWebhookService.enqueueSystemWebhook(type, packedUser);
}
}
25 changes: 23 additions & 2 deletions packages/backend/src/core/UserWebhookService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

import { Inject, Injectable } from '@nestjs/common';
import * as Redis from 'ioredis';
import { type WebhooksRepository } from '@/models/_.js';
import { MiUser, type WebhooksRepository } from '@/models/_.js';
import { MiWebhook, WebhookEventTypes } from '@/models/Webhook.js';
import { DI } from '@/di-symbols.js';
import { bindThis } from '@/decorators.js';
import { GlobalEvents } from '@/core/GlobalEventService.js';
import type { OnApplicationShutdown } from '@nestjs/common';
import type { Packed } from '@/misc/json-schema.js';
import { QueueService } from '@/core/QueueService.js';
import type { OnApplicationShutdown } from '@nestjs/common';

export type UserWebhookPayload<T extends WebhookEventTypes> =
T extends 'note' | 'reply' | 'renote' |'mention' ? {
Expand All @@ -34,6 +35,7 @@ export class UserWebhookService implements OnApplicationShutdown {
private redisForSub: Redis.Redis,
@Inject(DI.webhooksRepository)
private webhooksRepository: WebhooksRepository,
private queueService: QueueService,
) {
this.redisForSub.on('message', this.onMessage);
}
Expand Down Expand Up @@ -75,6 +77,25 @@ export class UserWebhookService implements OnApplicationShutdown {
return query.getMany();
}

/**
* UserWebhook をWebhook配送キューに追加する
* @see QueueService.userWebhookDeliver
*/
@bindThis
public async enqueueUserWebhook<T extends WebhookEventTypes>(
userId: MiUser['id'],
type: T,
content: UserWebhookPayload<T>,
) {
const webhooks = await this.getActiveWebhooks()
.then(webhooks => webhooks.filter(webhook => webhook.userId === userId && webhook.on.includes(type)));
return Promise.all(
webhooks.map(webhook => {
return this.queueService.userWebhookDeliver(webhook, type, content);
}),
);
}

@bindThis
private async onMessage(_: string, data: string): Promise<void> {
const obj = JSON.parse(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,10 @@ export class CheckModeratorsActivityProcessorService {

// -- SystemWebhook

const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks()
.then(it => it.filter(it => it.on.includes('inactiveModeratorsWarning')));
for (const systemWebhook of systemWebhooks) {
this.systemWebhookService.enqueueSystemWebhook(
systemWebhook,
'inactiveModeratorsWarning',
{ remainingTime: remainingTime },
);
}
return this.systemWebhookService.enqueueSystemWebhook(
'inactiveModeratorsWarning',
{ remainingTime: remainingTime },
);
}

@bindThis
Expand Down Expand Up @@ -269,15 +264,10 @@ export class CheckModeratorsActivityProcessorService {

// -- SystemWebhook

const systemWebhooks = await this.systemWebhookService.fetchActiveSystemWebhooks()
.then(it => it.filter(it => it.on.includes('inactiveModeratorsInvitationOnlyChanged')));
for (const systemWebhook of systemWebhooks) {
this.systemWebhookService.enqueueSystemWebhook(
systemWebhook,
'inactiveModeratorsInvitationOnlyChanged',
{},
);
}
return this.systemWebhookService.enqueueSystemWebhook(
'inactiveModeratorsInvitationOnlyChanged',
{},
);
}

@bindThis
Expand Down
Loading
Loading