Skip to content

Commit

Permalink
v1.7.0
Browse files Browse the repository at this point in the history
 - Fix memory leak caused by unsatisfied queue in `writeStream` class
 - New authentication model for `http` uploads based on more secure and
frequently changed `connectionId` see
[#159](#159)
 - Fixed overwhelming requests to `Meteor.users` on every received chunk
 - Minimised amount of data transferred between WebWorker and MainThread
 - Use minified version of WebWorker
 - `http` upload uses HTTP requests for all tasks `start`, `continue`,
`EOF`, previously Meteor methods was involved
 - Overall code enhancements, DRY
  • Loading branch information
dr-dimitru committed Aug 20, 2016
1 parent 7aff5b6 commit bccbd14
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 76 deletions.
204 changes: 131 additions & 73 deletions files.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ if Meteor.isServer
self = @
@stream = fs.createWriteStream @path, {flags: 'a', mode: self.permissions, highWaterMark: 0}
@drained = true
@aborted = false
@writtenChunks = 0

@stream.on 'drain', -> bound ->
Expand All @@ -39,6 +40,7 @@ if Meteor.isServer

@stream.on 'error', (error) -> bound ->
console.error "[FilesCollection] [writeStream] [ERROR:]", error
self.abort()
return

###
Expand All @@ -51,7 +53,7 @@ if Meteor.isServer
@returns {Boolean} - True if chunk is sent to stream, false if chunk is set into queue
###
write: (num, chunk, callback) ->
if not @stream._writableState.ended and num > @writtenChunks
if not @aborted and (not @stream._writableState.ended and num > @writtenChunks)
if @drained and num is (@writtenChunks + 1)
@drained = false
@stream.write chunk, callback
Expand All @@ -72,7 +74,7 @@ if Meteor.isServer
@returns {Boolean} - True if stream is fulfilled, false if queue is in progress
###
end: (callback) ->
unless @stream._writableState.ended
if not @aborted and not @stream._writableState.ended
if @writtenChunks is @maxLength
@stream.end callback
return true
Expand All @@ -83,6 +85,16 @@ if Meteor.isServer
return
, 25
return false

###
@memberOf writeStream
@name abort
@summary Aborts writing to writableStream, prevent memory leaks caused by unsatisfied queue
@returns {Boolean} - True
###
abort: ->
@aborted = true
return true
else
`import { EventEmitter } from './event-emitter.jsx'`

Expand Down Expand Up @@ -494,29 +506,32 @@ class FilesCollection
delete @continueUploadTTL
delete @responseHeaders

if _.has(Package, 'accounts-base')
Accounts = Package['accounts-base'].Accounts unless Accounts
setTokenCookie = ->
if (not cookie.has('meteor_login_token') and Accounts._lastLoginTokenWhenPolled) or (cookie.has('meteor_login_token') and (cookie.get('meteor_login_token') isnt Accounts._lastLoginTokenWhenPolled))
cookie.set 'meteor_login_token', Accounts._lastLoginTokenWhenPolled, null, '/'
cookie.send()

unsetTokenCookie = ->
if cookie.has 'meteor_login_token'
cookie.remove 'meteor_login_token'
setTokenCookie = ->
Meteor.setTimeout ->
if (not cookie.has('x_mtok') and Meteor.connection._lastSessionId) or (cookie.has('x_mtok') and (cookie.get('x_mtok') isnt Meteor.connection._lastSessionId))
cookie.set 'x_mtok', Meteor.connection._lastSessionId, null, '/'
cookie.send()

Accounts.onLogin ->
setTokenCookie()
return
Accounts.onLogout ->
unsetTokenCookie()
return
, 25
return

if Accounts._lastLoginTokenWhenPolled
unsetTokenCookie = ->
if cookie.has 'x_mtok'
cookie.remove 'x_mtok'
cookie.send()
return

Tracker.autorun ->
if Meteor.status().connected
setTokenCookie()
else
unsetTokenCookie()
return

if Meteor.connection._lastSessionId
setTokenCookie()
else
unsetTokenCookie()

check @onbeforeunloadMessage, Match.OneOf String, Function
else
Expand Down Expand Up @@ -598,6 +613,7 @@ class FilesCollection
console.info "[FilesCollection] [_preCollectionCursor.observeChanges] [removed]: #{_id}" if self.debug
if self._currentUploads?[_id]
self._currentUploads[_id].end()
self._currentUploads[_id].abort()
delete self._currentUploads[_id]
return

Expand Down Expand Up @@ -724,31 +740,56 @@ class FilesCollection

request.on 'end', -> bound ->
try
opts = fileId: request.headers['x-fileid']

if request.headers['x-eof'] is '1'
opts.eof = true
if request.headers['x-mtok'] and Meteor.server.sessions?[request.headers['x-mtok']]
user = userId: Meteor.server.sessions[request.headers['x-mtok']]?.userId
else
opts.binData = new Buffer body, 'base64'
opts.chunkId = parseInt request.headers['x-chunkid']
user = self._getUser {request, response}

user = self._getUser {request, response}
_continueUpload = self._continueUpload opts.fileId
unless _continueUpload
throw new Meteor.Error 408, 'Can\'t continue upload, session expired. Start upload again.'
unless request.headers['x-start'] is '1'
opts = fileId: request.headers['x-fileid']
if request.headers['x-eof'] is '1'
opts.eof = true
else
opts.binData = new Buffer body, 'base64'
opts.chunkId = parseInt request.headers['x-chunkid']

{result, opts} = self._prepareUpload _.extend(opts, _continueUpload), user.userId, 'HTTP'
_continueUpload = self._continueUpload opts.fileId
unless _continueUpload
throw new Meteor.Error 408, 'Can\'t continue upload, session expired. Start upload again.'

{result, opts} = self._prepareUpload _.extend(opts, _continueUpload), user.userId, 'HTTP'

if opts.eof
Meteor.wrapAsync(self._handleUpload.bind(self, result, opts))()
response.writeHead 200
result.file.meta = fixJSONStringify result.file.meta if result?.file?.meta
response.end JSON.stringify result
else
self.emit '_handleUpload', result, opts, NOOP

response.writeHead 204
response.end()

if opts.eof
Meteor.wrapAsync(self._handleUpload.bind(self, result, opts))()
response.writeHead 200
result.file.meta = fixJSONStringify result.file.meta if result?.file?.meta
response.end JSON.stringify result
else
self.emit '_handleUpload', result, opts, NOOP

response.writeHead 200
response.end JSON.stringify {success: true}
console.info "[FilesCollection] [File Start HTTP] #{opts.file.name} - #{opts.fileId}" if self.debug
opts = JSON.parse body
opts.file.meta = fixJSONParse opts.file.meta if opts?.file?.meta
{result} = self._prepareUpload _.clone(opts), @userId, 'Start Method'
opts._id = opts.fileId
opts.createdAt = new Date()
self._preCollection.insert opts
self._createStream result._id, result.path, opts

if opts.returnMeta
response.writeHead 200
response.end JSON.stringify {
uploadRoute: "#{self.downloadRoute}/#{self.collectionName}/__upload"
file: result
}
else
response.writeHead 204
response.end()
catch error
handleError error
return
Expand Down Expand Up @@ -1046,8 +1087,7 @@ class FilesCollection
_getFileName: (fileData) ->
fileName = fileData.name or fileData.fileName
if _.isString(fileName) and fileName.length > 0
cleanName = (str) -> str.replace(/\.\./g, '').replace /\//g, ''
return cleanName(fileData.name or fileData.fileName)
return (fileData.name or fileData.fileName).replace(/\.\./g, '').replace /\//g, ''
else
return ''

Expand All @@ -1065,15 +1105,21 @@ class FilesCollection

if Meteor.isServer
if http
cookie = http.request.Cookies
if _.has(Package, 'accounts-base') and cookie.has 'meteor_login_token'
Accounts = Package['accounts-base'].Accounts unless Accounts
user = Meteor.users.findOne 'services.resume.loginTokens.hashedToken': Accounts._hashLoginToken cookie.get 'meteor_login_token'
if user
result.user = () -> return user
result.userId = user._id
mtok = null
if http.request.headers['x-mtok']
mtok = http.request.headers['x-mtok']
else
cookie = http.request.Cookies
if cookie.has 'x_mtok'
mtok = cookie.get 'x_mtok'

if mtok
userId = Meteor.server.sessions?[mtok]?.userId
if userId
result.user = -> return Meteor.users.findOne userId
result.userId = userId
else
if _.has(Package, 'accounts-base') and Meteor.userId()
if Meteor.userId?()
result.user = -> return Meteor.user()
result.userId = Meteor.userId()

Expand Down Expand Up @@ -1478,10 +1524,11 @@ class FilesCollection
console.time('loadFile ' + @config.file.name)

if Worker and @config.allowWebWorkers
@worker = new Worker Meteor.absoluteUrl 'packages/ostrio_files/worker.js'
@worker = new Worker Meteor.absoluteUrl 'packages/ostrio_files/worker.min.js'
else
@worker = null

@startTime = {}
@config.debug = @collection.debug
@currentChunk = 0
@transferTime = 0
Expand Down Expand Up @@ -1577,14 +1624,14 @@ class FilesCollection
if opts.binData
if @config.transport is 'ddp'
Meteor.call @collection._methodNames._Write, opts, (error) ->
self.transferTime += (+new Date) - evt.data.start
self.transferTime += (+new Date) - self.startTime[opts.chunkId]
if error
if self.result.state.get() isnt 'aborted'
self.emitEvent 'end', [error]
else
++self.sentChunks
if self.sentChunks >= self.fileLength
self.emitEvent 'sendEOF', [opts]
self.emitEvent 'sendEOF'
else if self.currentChunk < self.fileLength
self.emitEvent 'upload'
self.emitEvent 'calculateStats'
Expand All @@ -1593,11 +1640,12 @@ class FilesCollection
HTTP.call 'POST', "#{@collection.downloadRoute}/#{@collection.collectionName}/__upload", {
content: opts.binData
headers:
'x-mtok': Meteor.connection?._lastSessionId or null
'x-fileid': opts.fileId
'x-chunkid': opts.chunkId
'content-type': 'text/plain'
}, (error, result) ->
self.transferTime += (+new Date) - evt.data.start
}, (error) ->
self.transferTime += (+new Date) - self.startTime[opts.chunkId]
if error
if "#{error}" is "Error: network"
self.result.pause()
Expand All @@ -1607,14 +1655,14 @@ class FilesCollection
else
++self.sentChunks
if self.sentChunks >= self.fileLength
self.emitEvent 'sendEOF', [opts]
self.emitEvent 'sendEOF'
else if self.currentChunk < self.fileLength
self.emitEvent 'upload'
self.emitEvent 'calculateStats'
return
return

sendEOF: (opts) ->
sendEOF: ->
unless @EOFsent
@EOFsent = true
self = @
Expand All @@ -1630,19 +1678,20 @@ class FilesCollection
HTTP.call 'POST', "#{@collection.downloadRoute}/#{@collection.collectionName}/__upload", {
content: ''
headers:
'x-eof': 1
'x-mtok': Meteor.connection?._lastSessionId or null
'x-eof': '1'
'x-fileId': opts.fileId
'content-type': 'text/plain'
}, (error, result) ->
res = JSON.parse result?.content or {}
res.meta = fixJSONParse res.meta if res?.meta
self.emitEvent 'end', [error, res]
result = JSON.parse result?.content or {}
result.meta = fixJSONParse result.meta if result?.meta
self.emitEvent 'end', [error, result]
return
return

proceedChunk: (chunkId, start) ->
self = @
chunk = @config.file.slice (@config.chunkSize * (chunkId - 1)), (@config.chunkSize * chunkId)
proceedChunk: (chunkId) ->
self = @
chunk = @config.file.slice (@config.chunkSize * (chunkId - 1)), (@config.chunkSize * chunkId)

if FileReader
fileReader = new FileReader
Expand All @@ -1652,7 +1701,6 @@ class FilesCollection
data: {
bin: (fileReader?.result or evt.srcElement?.result or evt.target?.result).split(',')[1]
chunkId: chunkId
start: start
}
}]
return
Expand All @@ -1670,15 +1718,13 @@ class FilesCollection
data: {
bin: fileReader.readAsDataURL(chunk).split(',')[1]
chunkId: chunkId
start: start
}
}]
else
self.emitEvent 'end', ['FileReader and FileReaderSync is not supported in this Browser!']
self.emitEvent 'end', ['File API is not supported in this Browser!']
return

upload: ->
start = +new Date
if @result.onPause.get()
return

Expand All @@ -1688,9 +1734,10 @@ class FilesCollection
if @currentChunk <= @fileLength
++@currentChunk
if @worker
@worker.postMessage({@sentChunks, start, @currentChunk, chunkSize: @config.chunkSize, file: @config.file})
@worker.postMessage({sc: @sentChunks, cc: @currentChunk, cs: @config.chunkSize, f: @config.file})
else
@emitEvent 'proceedChunk', [@currentChunk, start]
@emitEvent 'proceedChunk', [@currentChunk]
@startTime[@currentChunk] = +new Date
return

createStreams: ->
Expand Down Expand Up @@ -1737,9 +1784,9 @@ class FilesCollection
fileLength: @fileLength
opts.FSName = @FSName if @FSName isnt @fileId

Meteor.call @collection._methodNames._Start, opts, (error) ->
handleStart = (error) ->
if error
console.error '[FilesCollection] [.call(_Start)] Error:', error if self.collection.debug
console.error '[FilesCollection] [_Start] Error:', error if self.collection.debug
self.emitEvent 'end', [error]
else
self.result.continueFunc = ->
Expand All @@ -1748,6 +1795,17 @@ class FilesCollection
return
self.emitEvent 'createStreams'
return

if @config.transport is 'ddp'
Meteor.call @collection._methodNames._Start, opts, handleStart
else
opts.file.meta = fixJSONStringify opts.file.meta if opts.file?.meta
HTTP.call 'POST', "#{@collection.downloadRoute}/#{@collection.collectionName}/__upload", {
data: opts
headers:
'x-start': '1'
'x-mtok': Meteor.connection?._lastSessionId or null
}, handleStart
return

pipe: (func) ->
Expand Down Expand Up @@ -1784,13 +1842,13 @@ class FilesCollection
if @worker
@worker.onmessage = (evt) ->
if evt.data.error
console.warn evt.data.error if self.collection.debug
self.emitEvent 'proceedChunk', [evt.data.chunkId, evt.data.start]
console.warn '[FilesCollection] [insert] [worker] [onmessage] [ERROR:]', evt.data.error if self.collection.debug
self.emitEvent 'proceedChunk', [evt.data.chunkId]
else
self.emitEvent 'sendChunk', [evt]
return
@worker.onerror = (e) ->
console.error '[FilesCollection] [insert] [worker] [ERROR:]', e if self.collection.debug
console.error '[FilesCollection] [insert] [worker] [onerror] [ERROR:]', e if self.collection.debug
self.emitEvent 'end', [e.message]
return

Expand Down
Loading

0 comments on commit bccbd14

Please sign in to comment.