-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error handling with customSource not hooked up correcly? #309
Comments
In the end, I think I found something that works-ish by handling all error in the customSource and trying not to ever bubble up the error throught the stream. I am still not sure this is the best story around error handling. If I can't retry enough and destroy the passthrough stream it will terminate the node process. import * as unzipper from "unzipper";
import { PassThrough } from 'node:stream';
import { Readable } from 'node:stream';
import pRetry from "p-retry";
const TEST_ERROR = process.env.TEST_ERROR || false;
const URL = 'http://127.0.0.1:8080/modules.zip'
const customSource = {
stream: function (offset, length) {
const pass = new PassThrough();
pass.on("error", (err) => console.log('pass error'))
let bytesWritten = 0;
pRetry(async () => {
const res = await fetch(URL, { headers: { Range: `bytes=${offset + bytesWritten}-${length ? offset + length - 1 : ''}` } })
if (!res.ok) throw new Error(`Failed to fetch, status ${res.status}`);
if (TEST_ERROR && Math.random() > 0.99) throw new Error(`Random fetch failure`);
const body = await res.body;
const stream = Readable.fromWeb(body);
await new Promise((resolve, reject) => {
stream.on("error", (err) => reject(err))
stream.on("end", () => {
pass.end();
resolve();
})
stream.on("data", (data) => {
if (TEST_ERROR && Math.random() > 0.99) stream.destroy(new Error('Random streaming failure'));
pass.write(data);
bytesWritten += data.length;
})
})
}, { retries: 5, onFailedAttempt: (err) => console.log(err) })
return pass
},
size: async function () {
return pRetry(async () => {
const res = await fetch(URL, { method: 'HEAD' });
return parseInt(res.headers.get('content-length'));
}, { retries: 5, onFailedAttempt: (err) => console.log(err) })
}
}
const directory = await unzipper.Open.custom(customSource);
const decompressedFiles = []
for await (const file of directory.files.filter(f => f.type === 'File')) {
try {
// console.log(file.path)
const fileStream = await file.stream();
//just consume the stream to decompress the file
await new Promise((resolve, reject) => {
fileStream.on('data', (chunk) => { /* noop*/ });
fileStream.on('end', () => resolve());
fileStream.on('error', (err) => {
console.log('Stream error:', err); // don't see that
reject(err)
});
});
decompressedFiles.push(file.filename);
} catch (error) {
console.log('for await try/catch'); //don't see that
}
}
console.log(decompressedFiles.length)
// zip -r modules.zip node_modules/
// npx http-server
// TEST_ERROR=true node index.mjs |
I don't know if the retry mechanism should be baked into the unzipper. Some projects may want to retry, and the way they retry could be very custom (i.e., number of retries, exponential backoff, etc.). Anything "can break," not just unzipper, so it seems to me that the retry methodology is outside the scope of this library. However, if you want to create your own custom adapter with retry, the approach above makes sense, but here are some comments: stream objects are not promises, so doing also, FYI: adding A better way to introduce mocked errors in the stream is to use a transform, which you pipe directly into
|
Thanks for the very helpful and generous anwser. I realy appreciate. Your comments are absolutely on point. My challenge is that I will be streaming from s3 from large-ish zip (~500MB) each with thousands of files (~300KB). Tens of thoudands of zips spanning 100+ terrabytes. On the plus side, with this library I can do it at 1 gbps, from my laptop from home. Once deployed on ec2 it should reach 3-4gbps on moderatly size VM. This is pretty much amazing. The downside is that I do 1000s of request per second and that inevitably some will fail. In my testing I get one hiccup at least every 30s. When that happens I want to retry only one entry, not the whole zip file. Without the trick above, I can't find a way to catch the error and it takes down the whole node process. I realize now that my proposal is making the stream in flowing mode wich is suboptimal. However, this made it easy the keep using the same returned passthrough stream. On a streaming error where I already have pushed some data I can resume where I felt off by adjusting the retried byte range request start position. I am struggling to see how I would do that pausing mode. I would unpipe the broken stream and re-pipe a new one later on retry? woulnd't the error have already propagated and I would be in the same situation? |
There is a good chance I am doing something wrong, but I feel like I tried everything. customSource need to be able to retry on stream error. There are two different scenarios: 1) at stream creating, 2) later while streaming.
For 1) some API don't only make the stream available only after a callback or promise. This means we need to return a passthrough immediately and pipe it later. I could retry before that.
For 2) I may have already streamed bytes and and it's simpler to throw the stream away and have the other side call file.stream() again.
In this example I am simulating 1), but regardless try to error the passthrough stream to get the other side to call file.stream() again. Unfortunately, I just can't figure how to connect it.
If it happens for the initial dictionnay I get the exception
when it happens after I get no excepton, node just stops
The text was updated successfully, but these errors were encountered: