Skip to content

Commit

Permalink
Throttle media download - avoid lockups receiving large files
Browse files Browse the repository at this point in the history
  • Loading branch information
allgood committed Jul 9, 2024
1 parent eb9766f commit ffd2bf3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"sequelize-cli": "^5.5.1",
"sequelize-typescript": "^2.1.6",
"socket.io": "^4.7.5",
"stream-throttle": "^0.1.3",
"uuid": "^8.3.2",
"xlsx": "https://cdn.sheetjs.com/xlsx-0.20.2/xlsx-0.20.2.tgz",
"yup": "^0.32.11"
Expand Down
82 changes: 49 additions & 33 deletions backend/src/services/WbotServices/wbotMessageListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Mutex } from "async-mutex";
import { Op } from "sequelize";
import moment from "moment";
import { Transform } from "stream";
import { Throttle } from "stream-throttle";
import Contact from "../../models/Contact";
import Ticket from "../../models/Ticket";
import Message from "../../models/Message";
Expand Down Expand Up @@ -55,7 +56,6 @@ import { makeRandomId } from "../../helpers/MakeRandomId";
import { GetCompanySetting } from "../../helpers/CheckSettings";
import Whatsapp from "../../models/Whatsapp";


type Session = WASocket & {
id?: number;
store?: Store;
Expand Down Expand Up @@ -264,72 +264,89 @@ const downloadMedia = async (msg: proto.IWebMessageInfo) => {
.replace("application", "document") as MediaType)
: (mineType.mimetype.split("/")[0] as MediaType);

let stream: Transform;
let stream: Transform | undefined;
let contDownload = 0;

while (contDownload < 10 && !stream) {
try {
const message =
msg.message.audioMessage ||
msg.message.videoMessage ||
msg.message.documentMessage ||
msg.message.documentWithCaptionMessage?.message?.documentMessage ||
msg.message.imageMessage ||
msg.message.stickerMessage ||
msg.message.extendedTextMessage?.contextInfo.quotedMessage.imageMessage ||
msg.message?.audioMessage ||
msg.message?.videoMessage ||
msg.message?.documentMessage ||
msg.message?.documentWithCaptionMessage?.message?.documentMessage ||
msg.message?.imageMessage ||
msg.message?.stickerMessage ||
msg.message?.extendedTextMessage?.contextInfo?.quotedMessage?.imageMessage ||
msg.message?.buttonsMessage?.imageMessage ||
msg.message?.templateMessage?.fourRowTemplate?.imageMessage ||
msg.message?.templateMessage?.hydratedTemplate?.imageMessage ||
msg.message?.templateMessage?.hydratedFourRowTemplate?.imageMessage ||
msg.message?.interactiveMessage?.header?.imageMessage;

if (message.directPath) {
if (message?.directPath) {
message.url = "";
}

// eslint-disable-next-line no-await-in-loop
stream = await downloadContentFromMessage(
message,
messageType
);
stream = await downloadContentFromMessage(message, messageType);
} catch (error) {
contDownload+=1;
contDownload += 1;
// eslint-disable-next-line no-await-in-loop, no-loop-func
await new Promise(resolve =>
{setTimeout(resolve, 1000 * contDownload * 2)}
);
logger.warn(
`>>>> erro ${contDownload} de baixar o arquivo ${msg?.key.id}`
);
await new Promise(resolve => {setTimeout(resolve, 1000 * contDownload * 2)});
logger.warn(`>>>> erro ${contDownload} de baixar o arquivo ${msg?.key?.id}`);
}
}

if (!stream) {
throw new Error("Failed to get stream");
}

let filename =
msg.message?.documentMessage?.fileName ||
msg.message?.documentWithCaptionMessage?.message?.documentMessage?.fileName ||
"";

if (!filename) {
const ext = mineType.mimetype.split("/")[1].split(";")[0];
filename = `${makeRandomId(5)}-${new Date().getTime()}.${ext}`;
} else {
filename = `${filename.split(".").slice(0, -1).join(".")}.${makeRandomId(5)}.${filename.split(".").slice(-1)}`;
}

const MAX_SPEED = 5 * 1024 * 1024 / 8; // 5Mbps
const THROTTLE_SPEED = 1024 * 1024 / 8; // 1Mbps
const LARGE_FILE_SIZE = 1024 * 1024; // 1 MiB

const throttle = new Throttle({ rate: MAX_SPEED });
let buffer = Buffer.from([]);
let totalSize = 0;
const startTime = Date.now();

try {
// eslint-disable-next-line no-restricted-syntax
for await (const chunk of stream) {
for await (const chunk of stream.pipe(throttle)) {
buffer = Buffer.concat([buffer, chunk]);
totalSize += chunk.length;

if (totalSize > LARGE_FILE_SIZE) {
throttle.rate = THROTTLE_SPEED;
}
}
} catch (error) {
return { data: "error", mimetype: "", filename: "" };
}

const endTime = Date.now();
const durationInSeconds = (endTime - startTime) / 1000;
const effectiveSpeed = totalSize / durationInSeconds; // bytes per second
logger.debug(`${filename} Download completed in ${durationInSeconds.toFixed(2)} seconds with an effective speed of ${(effectiveSpeed / 1024 / 1024).toFixed(2)} MBps`);

if (!buffer) {
Sentry.setExtra("ERR_WAPP_DOWNLOAD_MEDIA", { msg });
Sentry.captureException(new Error("ERR_WAPP_DOWNLOAD_MEDIA"));
throw new Error("ERR_WAPP_DOWNLOAD_MEDIA");
}
let filename =
msg.message?.documentMessage?.fileName
|| msg.message?.documentWithCaptionMessage?.message?.documentMessage?.fileName
|| "";

if (!filename) {
const ext = mineType.mimetype.split("/")[1].split(";")[0];
filename = `${makeRandomId(5)}-${new Date().getTime()}.${ext}`;
} else {
filename = `${filename.split(".").slice(0,-1).join(".")}.${makeRandomId(5)}.${filename.split(".").slice(-1)}`;
}
const media = {
data: buffer,
mimetype: mineType.mimetype,
Expand All @@ -338,7 +355,6 @@ const downloadMedia = async (msg: proto.IWebMessageInfo) => {
return media;
};


const verifyContact = async (
msgContact: IMe,
wbot: Session,
Expand Down

0 comments on commit ffd2bf3

Please sign in to comment.