From 01843fb6f8227bc7b604510b287d14cd19725038 Mon Sep 17 00:00:00 2001 From: Alvin Natawiguna Date: Wed, 10 May 2023 14:09:39 +0700 Subject: [PATCH 1/2] feat: add rudimentary support for @aws-sdk/client-s3 refactor: use promise to resolve stream --- lib/Open/directory.js | 84 ++++++++++++++++++++++++++----------------- lib/Open/index.js | 34 +++++++++++------- package.json | 5 ++- 3 files changed, 76 insertions(+), 47 deletions(-) diff --git a/lib/Open/directory.js b/lib/Open/directory.js index d74dba7..606446d 100644 --- a/lib/Open/directory.js +++ b/lib/Open/directory.js @@ -13,28 +13,29 @@ var signature = Buffer.alloc(4); signature.writeUInt32LE(0x06054b50,0); function getCrxHeader(source) { - var sourceStream = source.stream(0).pipe(PullStream()); - - return sourceStream.pull(4).then(function(data) { - var signature = data.readUInt32LE(0); - if (signature === 0x34327243) { - var crxHeader; - return sourceStream.pull(12).then(function(data) { - crxHeader = binary.parse(data) - .word32lu('version') - .word32lu('pubKeyLength') - .word32lu('signatureLength') - .vars; - }).then(function() { - return sourceStream.pull(crxHeader.pubKeyLength +crxHeader.signatureLength); - }).then(function(data) { - crxHeader.publicKey = data.slice(0,crxHeader.pubKeyLength); - crxHeader.signature = data.slice(crxHeader.pubKeyLength); - crxHeader.size = 16 + crxHeader.pubKeyLength +crxHeader.signatureLength; - return crxHeader; - }); - } - }); + return source.stream(0).then(function(sourceStream) { + sourceStream.pipe(PullStream()); + return sourceStream.pull(4).then(function(data) { + var signature = data.readUInt32LE(0); + if (signature === 0x34327243) { + var crxHeader; + return sourceStream.pull(12).then(function(data) { + crxHeader = binary.parse(data) + .word32lu('version') + .word32lu('pubKeyLength') + .word32lu('signatureLength') + .vars; + }).then(function() { + return sourceStream.pull(crxHeader.pubKeyLength +crxHeader.signatureLength); + }).then(function(data) { + crxHeader.publicKey = data.slice(0,crxHeader.pubKeyLength); + crxHeader.signature = data.slice(crxHeader.pubKeyLength); + crxHeader.size = 16 + crxHeader.pubKeyLength +crxHeader.signatureLength; + return crxHeader; + }); + } + }); + }) } // Zip64 File Format Notes: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT @@ -51,9 +52,14 @@ function getZip64CentralDirectory(source, zip64CDL) { } var dir64 = PullStream(); - source.stream(d64loc.offsetToStartOfCentralDirectory).pipe(dir64); - - return dir64.pull(56) + return source.stream(d64loc.offsetToStartOfCentralDirectory) + .then(function(stream) { + stream.pipe(dir64); + return dir64.pull(56) + }) + .catch(function(error) { + return dir64.destroy(error); + }) } // Zip64 File Format Notes: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT @@ -94,12 +100,15 @@ module.exports = function centralDirectory(source, options) { .then(function(size) { sourceSize = size; - source.stream(Math.max(0,size-tailSize)) - .on('error', function (error) { endDir.emit('error', error) }) - .pipe(endDir); - + return source.stream(Math.max(0,size-tailSize)) + }) + .then(function(stream) { + stream.pipe(endDir); return endDir.pull(signature); }) + .catch(function(error) { + return endDir.destroy(error); + }) .then(function() { return Promise.props({directory: endDir.pull(22), crxHeader: crxHeader}); }) @@ -130,9 +139,15 @@ module.exports = function centralDirectory(source, options) { const zip64CDLOffset = sourceSize - (tailSize - endDir.match + zip64CDLSize) const zip64CDLStream = PullStream(); - source.stream(zip64CDLOffset).pipe(zip64CDLStream); - - return zip64CDLStream.pull(zip64CDLSize) + return source.stream(zip64CDLOffset) + .then(function(stream) { + stream.pipe(zip64CDLStream); + return zip64CDLStream.pull(zip64CDLSize); + }) + .catch(function(error) { + zip64CDLStream.destroy(error); + throw error; + }) .then(function (d) { return getZip64CentralDirectory(source, d) }) .then(function (dir64record) { vars = parseZip64DirRecord(dir64record) @@ -147,7 +162,10 @@ module.exports = function centralDirectory(source, options) { }); }) .then(function() { - source.stream(vars.offsetToStartOfCentralDirectory).pipe(records); + return source.stream(vars.offsetToStartOfCentralDirectory); + }) + .then(function(stream) { + stream.pipe(records); vars.extract = function(opts) { if (!opts || !opts.path) throw new Error('PATH_MISSING'); diff --git a/lib/Open/index.js b/lib/Open/index.js index fd89746..1a1cbc5 100644 --- a/lib/Open/index.js +++ b/lib/Open/index.js @@ -13,7 +13,7 @@ module.exports = { stream: function(offset, length) { var stream = Stream.PassThrough(); stream.end(buffer.slice(offset, length)); - return stream; + return Promise.resolve(stream); }, size: function() { return Promise.resolve(buffer.length); @@ -24,7 +24,7 @@ module.exports = { file: function(filename, options) { var source = { stream: function(offset,length) { - return fs.createReadStream(filename,{start: offset, end: length && offset+length}); + return Promise.resolve(fs.createReadStream(filename,{start: offset, end: length && offset+length})); }, size: function() { return new Promise(function(resolve,reject) { @@ -48,11 +48,11 @@ module.exports = { params.headers = params.headers || {}; var source = { - stream : function(offset,length) { + stream: function(offset,length) { var options = Object.create(params); options.headers = Object.create(params.headers); options.headers.range = 'bytes='+offset+'-' + (length ? length : ''); - return request(options); + return Promise.resolve(request(options)); }, size: function() { return new Promise(function(resolve,reject) { @@ -71,24 +71,32 @@ module.exports = { return directory(source, options); }, - s3 : function(client,params, options) { + s3: function(client, params, options) { + // detect aws-sdk v2 or aws-sdk v3 + const isAwsSdkV3 = Object.prototype.hasOwnProperty.call(client, 'send'); + var source = { size: function() { - return new Promise(function(resolve,reject) { - client.headObject(params, function(err,d) { - if (err) - reject(err); - else - resolve(d.ContentLength); + if (isAwsSdkV3) { + return client.headObject(params).then(function (res) { + return res.ContentLength; }); + } + return client.headObject(params).promise().then(function (res) { + return res.ContentLength; }); }, - stream: function(offset,length) { + stream: function(offset, length) { var d = {}; for (var key in params) d[key] = params[key]; d.Range = 'bytes='+offset+'-' + (length ? length : ''); - return client.getObject(d).createReadStream(); + if (isAwsSdkV3) { + return client.getObject(d).then(function (res) { + return res.Body; + }) + } + return Promise.resolve(client.getObject(d).createReadStream()); } }; diff --git a/package.json b/package.json index 9f2dffc..cf615ee 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,6 @@ "setimmediate": "~1.0.4" }, "devDependencies": { - "aws-sdk": "^2.77.0", "dirdiff": ">= 0.0.1 < 1", "iconv-lite": "^0.4.24", "request": "^2.88.0", @@ -43,6 +42,10 @@ "tap": ">= 0.3.0 < 1", "temp": ">= 0.4.0 < 1" }, + "optionalDependencies": { + "aws-sdk": "^2.77.0", + "@aws-sdk/client-s3": "^3.304.0" + }, "directories": { "example": "examples", "test": "test" From 65c67b3a4c58f15d044c1ca9e98d70a9d7ad6f69 Mon Sep 17 00:00:00 2001 From: Alvin Natawiguna Date: Wed, 10 May 2023 15:00:31 +0700 Subject: [PATCH 2/2] fix: error handling --- lib/Open/directory.js | 58 ++++++++++++++++++++++--------------------- lib/Open/unzip.js | 41 +++++++++++++++++------------- test/openCustom.js | 2 +- 3 files changed, 55 insertions(+), 46 deletions(-) diff --git a/lib/Open/directory.js b/lib/Open/directory.js index 606446d..b6a71ea 100644 --- a/lib/Open/directory.js +++ b/lib/Open/directory.js @@ -13,29 +13,35 @@ var signature = Buffer.alloc(4); signature.writeUInt32LE(0x06054b50,0); function getCrxHeader(source) { - return source.stream(0).then(function(sourceStream) { - sourceStream.pipe(PullStream()); - return sourceStream.pull(4).then(function(data) { - var signature = data.readUInt32LE(0); - if (signature === 0x34327243) { - var crxHeader; - return sourceStream.pull(12).then(function(data) { - crxHeader = binary.parse(data) - .word32lu('version') - .word32lu('pubKeyLength') - .word32lu('signatureLength') - .vars; - }).then(function() { - return sourceStream.pull(crxHeader.pubKeyLength +crxHeader.signatureLength); - }).then(function(data) { - crxHeader.publicKey = data.slice(0,crxHeader.pubKeyLength); - crxHeader.signature = data.slice(crxHeader.pubKeyLength); - crxHeader.size = 16 + crxHeader.pubKeyLength +crxHeader.signatureLength; - return crxHeader; - }); - } - }); - }) + const res = PullStream(); + + return source.stream(0) + .catch(function(e) { + res.emit('error', e); + }) + .then(function(stream) { + stream.pipe(res); + return res.pull(4).then(function(data) { + var signature = data.readUInt32LE(0); + if (signature === 0x34327243) { + var crxHeader; + return res.pull(12).then(function(data) { + crxHeader = binary.parse(data) + .word32lu('version') + .word32lu('pubKeyLength') + .word32lu('signatureLength') + .vars; + }).then(function() { + return res.pull(crxHeader.pubKeyLength + crxHeader.signatureLength); + }).then(function(data) { + crxHeader.publicKey = data.slice(0,crxHeader.pubKeyLength); + crxHeader.signature = data.slice(crxHeader.pubKeyLength); + crxHeader.size = 16 + crxHeader.pubKeyLength + crxHeader.signatureLength; + return crxHeader; + }); + } + }); + }) } // Zip64 File Format Notes: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT @@ -107,7 +113,7 @@ module.exports = function centralDirectory(source, options) { return endDir.pull(signature); }) .catch(function(error) { - return endDir.destroy(error); + endDir.emit('error', error); }) .then(function() { return Promise.props({directory: endDir.pull(22), crxHeader: crxHeader}); @@ -144,10 +150,6 @@ module.exports = function centralDirectory(source, options) { stream.pipe(zip64CDLStream); return zip64CDLStream.pull(zip64CDLSize); }) - .catch(function(error) { - zip64CDLStream.destroy(error); - throw error; - }) .then(function (d) { return getZip64CentralDirectory(source, d) }) .then(function (dir64record) { vars = parseZip64DirRecord(dir64record) diff --git a/lib/Open/unzip.js b/lib/Open/unzip.js index 099bf10..b78868b 100644 --- a/lib/Open/unzip.js +++ b/lib/Open/unzip.js @@ -14,14 +14,19 @@ if (!Stream.Writable || !Stream.Writable.prototype.destroy) module.exports = function unzip(source,offset,_password, directoryVars) { var file = PullStream(), - entry = Stream.PassThrough(); + entry = Stream.PassThrough(), + req; - var req = source.stream(offset); - req.pipe(file).on('error', function(e) { - entry.emit('error', e); - }); - - entry.vars = file.pull(30) + entry.vars = source.stream(offset) + .catch(function(e) { + entry.emit('error', e); + }) + .then(function(srcStream) { + req = srcStream.pipe(file).on('error', function(e) { + entry.emit('error', e); + }); + return file.pull(30) + }) .then(function(data) { var vars = binary.parse(data) .word32lu('signature') @@ -106,16 +111,18 @@ module.exports = function unzip(source,offset,_password, directoryVars) { .on('error',function(err) { entry.emit('error',err);}) .pipe(entry) .on('finish', function() { - if(req.destroy) - req.destroy() - else if (req.abort) - req.abort(); - else if (req.close) - req.close(); - else if (req.push) - req.push(); - else - console.log('warning - unable to close stream'); + if (req) { + if(req.destroy) + req.destroy() + else if (req.abort) + req.abort(); + else if (req.close) + req.close(); + else if (req.push) + req.push(); + else + console.log('warning - unable to close stream'); + } }); }) .catch(function(e) { diff --git a/test/openCustom.js b/test/openCustom.js index 5cd0f78..22a4c66 100644 --- a/test/openCustom.js +++ b/test/openCustom.js @@ -11,7 +11,7 @@ test("get content of a single file entry out of a zip", function (t) { var customSource = { stream: function(offset,length) { - return fs.createReadStream(archive, {start: offset, end: length && offset+length}); + return Promise.resolve(fs.createReadStream(archive, {start: offset, end: length && offset+length})); }, size: function() { return new Promise(function(resolve, reject) {