From ffd2bf3a6f61ed243cc56eb585b015915d23386f Mon Sep 17 00:00:00 2001 From: Claudemir Todo Bom Date: Tue, 9 Jul 2024 20:54:35 -0300 Subject: [PATCH] Throttle media download - avoid lockups receiving large files --- backend/package.json | 1 + .../WbotServices/wbotMessageListener.ts | 82 +++++++++++-------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/backend/package.json b/backend/package.json index 5661bca1..76e4d490 100644 --- a/backend/package.json +++ b/backend/package.json @@ -54,6 +54,7 @@ "sequelize-cli": "^5.5.1", "sequelize-typescript": "^2.1.6", "socket.io": "^4.7.5", + "stream-throttle": "^0.1.3", "uuid": "^8.3.2", "xlsx": "https://cdn.sheetjs.com/xlsx-0.20.2/xlsx-0.20.2.tgz", "yup": "^0.32.11" diff --git a/backend/src/services/WbotServices/wbotMessageListener.ts b/backend/src/services/WbotServices/wbotMessageListener.ts index ca9566d4..02ffc14d 100644 --- a/backend/src/services/WbotServices/wbotMessageListener.ts +++ b/backend/src/services/WbotServices/wbotMessageListener.ts @@ -22,6 +22,7 @@ import { Mutex } from "async-mutex"; import { Op } from "sequelize"; import moment from "moment"; import { Transform } from "stream"; +import { Throttle } from "stream-throttle"; import Contact from "../../models/Contact"; import Ticket from "../../models/Ticket"; import Message from "../../models/Message"; @@ -55,7 +56,6 @@ import { makeRandomId } from "../../helpers/MakeRandomId"; import { GetCompanySetting } from "../../helpers/CheckSettings"; import Whatsapp from "../../models/Whatsapp"; - type Session = WASocket & { id?: number; store?: Store; @@ -264,72 +264,89 @@ const downloadMedia = async (msg: proto.IWebMessageInfo) => { .replace("application", "document") as MediaType) : (mineType.mimetype.split("/")[0] as MediaType); - let stream: Transform; + let stream: Transform | undefined; let contDownload = 0; while (contDownload < 10 && !stream) { try { const message = - msg.message.audioMessage || - msg.message.videoMessage || - msg.message.documentMessage || - msg.message.documentWithCaptionMessage?.message?.documentMessage || - msg.message.imageMessage || - msg.message.stickerMessage || - msg.message.extendedTextMessage?.contextInfo.quotedMessage.imageMessage || + msg.message?.audioMessage || + msg.message?.videoMessage || + msg.message?.documentMessage || + msg.message?.documentWithCaptionMessage?.message?.documentMessage || + msg.message?.imageMessage || + msg.message?.stickerMessage || + msg.message?.extendedTextMessage?.contextInfo?.quotedMessage?.imageMessage || msg.message?.buttonsMessage?.imageMessage || msg.message?.templateMessage?.fourRowTemplate?.imageMessage || msg.message?.templateMessage?.hydratedTemplate?.imageMessage || msg.message?.templateMessage?.hydratedFourRowTemplate?.imageMessage || msg.message?.interactiveMessage?.header?.imageMessage; - if (message.directPath) { + if (message?.directPath) { message.url = ""; } // eslint-disable-next-line no-await-in-loop - stream = await downloadContentFromMessage( - message, - messageType - ); + stream = await downloadContentFromMessage(message, messageType); } catch (error) { - contDownload+=1; + contDownload += 1; // eslint-disable-next-line no-await-in-loop, no-loop-func - await new Promise(resolve => - {setTimeout(resolve, 1000 * contDownload * 2)} - ); - logger.warn( - `>>>> erro ${contDownload} de baixar o arquivo ${msg?.key.id}` - ); + await new Promise(resolve => {setTimeout(resolve, 1000 * contDownload * 2)}); + logger.warn(`>>>> erro ${contDownload} de baixar o arquivo ${msg?.key?.id}`); } } + if (!stream) { + throw new Error("Failed to get stream"); + } + + let filename = + msg.message?.documentMessage?.fileName || + msg.message?.documentWithCaptionMessage?.message?.documentMessage?.fileName || + ""; + + if (!filename) { + const ext = mineType.mimetype.split("/")[1].split(";")[0]; + filename = `${makeRandomId(5)}-${new Date().getTime()}.${ext}`; + } else { + filename = `${filename.split(".").slice(0, -1).join(".")}.${makeRandomId(5)}.${filename.split(".").slice(-1)}`; + } + + const MAX_SPEED = 5 * 1024 * 1024 / 8; // 5Mbps + const THROTTLE_SPEED = 1024 * 1024 / 8; // 1Mbps + const LARGE_FILE_SIZE = 1024 * 1024; // 1 MiB + + const throttle = new Throttle({ rate: MAX_SPEED }); let buffer = Buffer.from([]); + let totalSize = 0; + const startTime = Date.now(); + try { // eslint-disable-next-line no-restricted-syntax - for await (const chunk of stream) { + for await (const chunk of stream.pipe(throttle)) { buffer = Buffer.concat([buffer, chunk]); + totalSize += chunk.length; + + if (totalSize > LARGE_FILE_SIZE) { + throttle.rate = THROTTLE_SPEED; + } } } catch (error) { return { data: "error", mimetype: "", filename: "" }; } + const endTime = Date.now(); + const durationInSeconds = (endTime - startTime) / 1000; + const effectiveSpeed = totalSize / durationInSeconds; // bytes per second + logger.debug(`${filename} Download completed in ${durationInSeconds.toFixed(2)} seconds with an effective speed of ${(effectiveSpeed / 1024 / 1024).toFixed(2)} MBps`); + if (!buffer) { Sentry.setExtra("ERR_WAPP_DOWNLOAD_MEDIA", { msg }); Sentry.captureException(new Error("ERR_WAPP_DOWNLOAD_MEDIA")); throw new Error("ERR_WAPP_DOWNLOAD_MEDIA"); } - let filename = - msg.message?.documentMessage?.fileName - || msg.message?.documentWithCaptionMessage?.message?.documentMessage?.fileName - || ""; - if (!filename) { - const ext = mineType.mimetype.split("/")[1].split(";")[0]; - filename = `${makeRandomId(5)}-${new Date().getTime()}.${ext}`; - } else { - filename = `${filename.split(".").slice(0,-1).join(".")}.${makeRandomId(5)}.${filename.split(".").slice(-1)}`; - } const media = { data: buffer, mimetype: mineType.mimetype, @@ -338,7 +355,6 @@ const downloadMedia = async (msg: proto.IWebMessageInfo) => { return media; }; - const verifyContact = async ( msgContact: IMe, wbot: Session,