From bccbd1403aab73e1aff00403c568b72d48515cdc Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Golev" Date: Sat, 20 Aug 2016 04:58:48 +0300 Subject: [PATCH] v1.7.0 - Fix memory leak caused by unsatisfied queue in `writeStream` class - New authentication model for `http` uploads based on more secure and frequently changed `connectionId` see [#159](https://github.com/VeliovGroup/Meteor-Files/issues/159) - Fixed overwhelming requests to `Meteor.users` on every received chunk - Minimised amount of data transferred between WebWorker and MainThread - Use minified version of WebWorker - `http` upload uses HTTP requests for all tasks `start`, `continue`, `EOF`, previously Meteor methods was involved - Overall code enhancements, DRY --- files.coffee | 204 +++++++++++++++++++++++++++++++++------------------ package.js | 6 +- 2 files changed, 134 insertions(+), 76 deletions(-) diff --git a/files.coffee b/files.coffee index 83e9e75b..f1cf9c7b 100755 --- a/files.coffee +++ b/files.coffee @@ -30,6 +30,7 @@ if Meteor.isServer self = @ @stream = fs.createWriteStream @path, {flags: 'a', mode: self.permissions, highWaterMark: 0} @drained = true + @aborted = false @writtenChunks = 0 @stream.on 'drain', -> bound -> @@ -39,6 +40,7 @@ if Meteor.isServer @stream.on 'error', (error) -> bound -> console.error "[FilesCollection] [writeStream] [ERROR:]", error + self.abort() return ### @@ -51,7 +53,7 @@ if Meteor.isServer @returns {Boolean} - True if chunk is sent to stream, false if chunk is set into queue ### write: (num, chunk, callback) -> - if not @stream._writableState.ended and num > @writtenChunks + if not @aborted and (not @stream._writableState.ended and num > @writtenChunks) if @drained and num is (@writtenChunks + 1) @drained = false @stream.write chunk, callback @@ -72,7 +74,7 @@ if Meteor.isServer @returns {Boolean} - True if stream is fulfilled, false if queue is in progress ### end: (callback) -> - unless @stream._writableState.ended + if not @aborted and not @stream._writableState.ended if @writtenChunks is @maxLength @stream.end callback return true @@ -83,6 +85,16 @@ if Meteor.isServer return , 25 return false + + ### + @memberOf writeStream + @name abort + @summary Aborts writing to writableStream, prevent memory leaks caused by unsatisfied queue + @returns {Boolean} - True + ### + abort: -> + @aborted = true + return true else `import { EventEmitter } from './event-emitter.jsx'` @@ -494,29 +506,32 @@ class FilesCollection delete @continueUploadTTL delete @responseHeaders - if _.has(Package, 'accounts-base') - Accounts = Package['accounts-base'].Accounts unless Accounts - setTokenCookie = -> - if (not cookie.has('meteor_login_token') and Accounts._lastLoginTokenWhenPolled) or (cookie.has('meteor_login_token') and (cookie.get('meteor_login_token') isnt Accounts._lastLoginTokenWhenPolled)) - cookie.set 'meteor_login_token', Accounts._lastLoginTokenWhenPolled, null, '/' - cookie.send() - - unsetTokenCookie = -> - if cookie.has 'meteor_login_token' - cookie.remove 'meteor_login_token' + setTokenCookie = -> + Meteor.setTimeout -> + if (not cookie.has('x_mtok') and Meteor.connection._lastSessionId) or (cookie.has('x_mtok') and (cookie.get('x_mtok') isnt Meteor.connection._lastSessionId)) + cookie.set 'x_mtok', Meteor.connection._lastSessionId, null, '/' cookie.send() - - Accounts.onLogin -> - setTokenCookie() - return - Accounts.onLogout -> - unsetTokenCookie() return + , 25 + return - if Accounts._lastLoginTokenWhenPolled + unsetTokenCookie = -> + if cookie.has 'x_mtok' + cookie.remove 'x_mtok' + cookie.send() + return + + Tracker.autorun -> + if Meteor.status().connected setTokenCookie() else unsetTokenCookie() + return + + if Meteor.connection._lastSessionId + setTokenCookie() + else + unsetTokenCookie() check @onbeforeunloadMessage, Match.OneOf String, Function else @@ -598,6 +613,7 @@ class FilesCollection console.info "[FilesCollection] [_preCollectionCursor.observeChanges] [removed]: #{_id}" if self.debug if self._currentUploads?[_id] self._currentUploads[_id].end() + self._currentUploads[_id].abort() delete self._currentUploads[_id] return @@ -724,31 +740,56 @@ class FilesCollection request.on 'end', -> bound -> try - opts = fileId: request.headers['x-fileid'] - - if request.headers['x-eof'] is '1' - opts.eof = true + if request.headers['x-mtok'] and Meteor.server.sessions?[request.headers['x-mtok']] + user = userId: Meteor.server.sessions[request.headers['x-mtok']]?.userId else - opts.binData = new Buffer body, 'base64' - opts.chunkId = parseInt request.headers['x-chunkid'] + user = self._getUser {request, response} - user = self._getUser {request, response} - _continueUpload = self._continueUpload opts.fileId - unless _continueUpload - throw new Meteor.Error 408, 'Can\'t continue upload, session expired. Start upload again.' + unless request.headers['x-start'] is '1' + opts = fileId: request.headers['x-fileid'] + if request.headers['x-eof'] is '1' + opts.eof = true + else + opts.binData = new Buffer body, 'base64' + opts.chunkId = parseInt request.headers['x-chunkid'] - {result, opts} = self._prepareUpload _.extend(opts, _continueUpload), user.userId, 'HTTP' + _continueUpload = self._continueUpload opts.fileId + unless _continueUpload + throw new Meteor.Error 408, 'Can\'t continue upload, session expired. Start upload again.' + + {result, opts} = self._prepareUpload _.extend(opts, _continueUpload), user.userId, 'HTTP' + + if opts.eof + Meteor.wrapAsync(self._handleUpload.bind(self, result, opts))() + response.writeHead 200 + result.file.meta = fixJSONStringify result.file.meta if result?.file?.meta + response.end JSON.stringify result + else + self.emit '_handleUpload', result, opts, NOOP + + response.writeHead 204 + response.end() - if opts.eof - Meteor.wrapAsync(self._handleUpload.bind(self, result, opts))() - response.writeHead 200 - result.file.meta = fixJSONStringify result.file.meta if result?.file?.meta - response.end JSON.stringify result else - self.emit '_handleUpload', result, opts, NOOP - response.writeHead 200 - response.end JSON.stringify {success: true} + console.info "[FilesCollection] [File Start HTTP] #{opts.file.name} - #{opts.fileId}" if self.debug + opts = JSON.parse body + opts.file.meta = fixJSONParse opts.file.meta if opts?.file?.meta + {result} = self._prepareUpload _.clone(opts), @userId, 'Start Method' + opts._id = opts.fileId + opts.createdAt = new Date() + self._preCollection.insert opts + self._createStream result._id, result.path, opts + + if opts.returnMeta + response.writeHead 200 + response.end JSON.stringify { + uploadRoute: "#{self.downloadRoute}/#{self.collectionName}/__upload" + file: result + } + else + response.writeHead 204 + response.end() catch error handleError error return @@ -1046,8 +1087,7 @@ class FilesCollection _getFileName: (fileData) -> fileName = fileData.name or fileData.fileName if _.isString(fileName) and fileName.length > 0 - cleanName = (str) -> str.replace(/\.\./g, '').replace /\//g, '' - return cleanName(fileData.name or fileData.fileName) + return (fileData.name or fileData.fileName).replace(/\.\./g, '').replace /\//g, '' else return '' @@ -1065,15 +1105,21 @@ class FilesCollection if Meteor.isServer if http - cookie = http.request.Cookies - if _.has(Package, 'accounts-base') and cookie.has 'meteor_login_token' - Accounts = Package['accounts-base'].Accounts unless Accounts - user = Meteor.users.findOne 'services.resume.loginTokens.hashedToken': Accounts._hashLoginToken cookie.get 'meteor_login_token' - if user - result.user = () -> return user - result.userId = user._id + mtok = null + if http.request.headers['x-mtok'] + mtok = http.request.headers['x-mtok'] + else + cookie = http.request.Cookies + if cookie.has 'x_mtok' + mtok = cookie.get 'x_mtok' + + if mtok + userId = Meteor.server.sessions?[mtok]?.userId + if userId + result.user = -> return Meteor.users.findOne userId + result.userId = userId else - if _.has(Package, 'accounts-base') and Meteor.userId() + if Meteor.userId?() result.user = -> return Meteor.user() result.userId = Meteor.userId() @@ -1478,10 +1524,11 @@ class FilesCollection console.time('loadFile ' + @config.file.name) if Worker and @config.allowWebWorkers - @worker = new Worker Meteor.absoluteUrl 'packages/ostrio_files/worker.js' + @worker = new Worker Meteor.absoluteUrl 'packages/ostrio_files/worker.min.js' else @worker = null + @startTime = {} @config.debug = @collection.debug @currentChunk = 0 @transferTime = 0 @@ -1577,14 +1624,14 @@ class FilesCollection if opts.binData if @config.transport is 'ddp' Meteor.call @collection._methodNames._Write, opts, (error) -> - self.transferTime += (+new Date) - evt.data.start + self.transferTime += (+new Date) - self.startTime[opts.chunkId] if error if self.result.state.get() isnt 'aborted' self.emitEvent 'end', [error] else ++self.sentChunks if self.sentChunks >= self.fileLength - self.emitEvent 'sendEOF', [opts] + self.emitEvent 'sendEOF' else if self.currentChunk < self.fileLength self.emitEvent 'upload' self.emitEvent 'calculateStats' @@ -1593,11 +1640,12 @@ class FilesCollection HTTP.call 'POST', "#{@collection.downloadRoute}/#{@collection.collectionName}/__upload", { content: opts.binData headers: + 'x-mtok': Meteor.connection?._lastSessionId or null 'x-fileid': opts.fileId 'x-chunkid': opts.chunkId 'content-type': 'text/plain' - }, (error, result) -> - self.transferTime += (+new Date) - evt.data.start + }, (error) -> + self.transferTime += (+new Date) - self.startTime[opts.chunkId] if error if "#{error}" is "Error: network" self.result.pause() @@ -1607,14 +1655,14 @@ class FilesCollection else ++self.sentChunks if self.sentChunks >= self.fileLength - self.emitEvent 'sendEOF', [opts] + self.emitEvent 'sendEOF' else if self.currentChunk < self.fileLength self.emitEvent 'upload' self.emitEvent 'calculateStats' return return - sendEOF: (opts) -> + sendEOF: -> unless @EOFsent @EOFsent = true self = @ @@ -1630,19 +1678,20 @@ class FilesCollection HTTP.call 'POST', "#{@collection.downloadRoute}/#{@collection.collectionName}/__upload", { content: '' headers: - 'x-eof': 1 + 'x-mtok': Meteor.connection?._lastSessionId or null + 'x-eof': '1' 'x-fileId': opts.fileId 'content-type': 'text/plain' }, (error, result) -> - res = JSON.parse result?.content or {} - res.meta = fixJSONParse res.meta if res?.meta - self.emitEvent 'end', [error, res] + result = JSON.parse result?.content or {} + result.meta = fixJSONParse result.meta if result?.meta + self.emitEvent 'end', [error, result] return return - proceedChunk: (chunkId, start) -> - self = @ - chunk = @config.file.slice (@config.chunkSize * (chunkId - 1)), (@config.chunkSize * chunkId) + proceedChunk: (chunkId) -> + self = @ + chunk = @config.file.slice (@config.chunkSize * (chunkId - 1)), (@config.chunkSize * chunkId) if FileReader fileReader = new FileReader @@ -1652,7 +1701,6 @@ class FilesCollection data: { bin: (fileReader?.result or evt.srcElement?.result or evt.target?.result).split(',')[1] chunkId: chunkId - start: start } }] return @@ -1670,15 +1718,13 @@ class FilesCollection data: { bin: fileReader.readAsDataURL(chunk).split(',')[1] chunkId: chunkId - start: start } }] else - self.emitEvent 'end', ['FileReader and FileReaderSync is not supported in this Browser!'] + self.emitEvent 'end', ['File API is not supported in this Browser!'] return upload: -> - start = +new Date if @result.onPause.get() return @@ -1688,9 +1734,10 @@ class FilesCollection if @currentChunk <= @fileLength ++@currentChunk if @worker - @worker.postMessage({@sentChunks, start, @currentChunk, chunkSize: @config.chunkSize, file: @config.file}) + @worker.postMessage({sc: @sentChunks, cc: @currentChunk, cs: @config.chunkSize, f: @config.file}) else - @emitEvent 'proceedChunk', [@currentChunk, start] + @emitEvent 'proceedChunk', [@currentChunk] + @startTime[@currentChunk] = +new Date return createStreams: -> @@ -1737,9 +1784,9 @@ class FilesCollection fileLength: @fileLength opts.FSName = @FSName if @FSName isnt @fileId - Meteor.call @collection._methodNames._Start, opts, (error) -> + handleStart = (error) -> if error - console.error '[FilesCollection] [.call(_Start)] Error:', error if self.collection.debug + console.error '[FilesCollection] [_Start] Error:', error if self.collection.debug self.emitEvent 'end', [error] else self.result.continueFunc = -> @@ -1748,6 +1795,17 @@ class FilesCollection return self.emitEvent 'createStreams' return + + if @config.transport is 'ddp' + Meteor.call @collection._methodNames._Start, opts, handleStart + else + opts.file.meta = fixJSONStringify opts.file.meta if opts.file?.meta + HTTP.call 'POST', "#{@collection.downloadRoute}/#{@collection.collectionName}/__upload", { + data: opts + headers: + 'x-start': '1' + 'x-mtok': Meteor.connection?._lastSessionId or null + }, handleStart return pipe: (func) -> @@ -1784,13 +1842,13 @@ class FilesCollection if @worker @worker.onmessage = (evt) -> if evt.data.error - console.warn evt.data.error if self.collection.debug - self.emitEvent 'proceedChunk', [evt.data.chunkId, evt.data.start] + console.warn '[FilesCollection] [insert] [worker] [onmessage] [ERROR:]', evt.data.error if self.collection.debug + self.emitEvent 'proceedChunk', [evt.data.chunkId] else self.emitEvent 'sendChunk', [evt] return @worker.onerror = (e) -> - console.error '[FilesCollection] [insert] [worker] [ERROR:]', e if self.collection.debug + console.error '[FilesCollection] [insert] [worker] [onerror] [ERROR:]', e if self.collection.debug self.emitEvent 'end', [e.message] return diff --git a/package.js b/package.js index d50cab9f..269547f0 100755 --- a/package.js +++ b/package.js @@ -1,7 +1,7 @@ Package.describe({ name: 'ostrio:files', - version: '1.6.10', - summary: 'Fast and robust file upload package, with support of FS, AWS, GridFS, DropBox or Google Drive', + version: '1.7.0', + summary: 'Fast and robust file upload package, with support of DDP, HTTP and WebRTC/DC uploads. Support of FS, AWS, GridFS, DropBox or Google Drive', git: 'https://github.com/VeliovGroup/Meteor-Files', documentation: 'README.md' }); @@ -12,7 +12,7 @@ Package.onUse(function(api) { api.use(['templating', 'reactive-var', 'tracker', 'http'], 'client'); api.use(['mongo','underscore', 'check', 'random', 'coffeescript', 'ecmascript', 'ostrio:cookies@2.0.5'], ['client', 'server']); api.addFiles('event-emitter.jsx', 'client'); - api.addAssets('worker.js', 'client'); + api.addAssets('worker.min.js', 'client'); api.mainModule('files.coffee', ['server', 'client']); api.export('FilesCollection'); });