From 19fafa46ccb2aa12bc1a83644a5f924e51e36803 Mon Sep 17 00:00:00 2001 From: Julian bilcke Date: Mon, 18 Jun 2012 13:22:28 +0200 Subject: [PATCH] bumped to 0.05 --- .gitignore | 1 + .npmignore | 1 + README.md | 28 ++++- examples/playback.coffee | 12 +++ examples/test.sample | Bin 413 -> 0 bytes examples/twitter.coffee | 10 +- lib/{ => api}/simple.js | 89 +++++++++++++--- lib/{ => api}/stream.js | 86 +++++++++++---- lib/cursor.js | 4 +- lib/record.js | 24 ++++- lib/sampler.js | 4 +- lib/store/file.js | 106 ++++++++++++++----- lib/store/memory.js | 41 +++++++- lib/store/yaml.js | 84 --------------- package.json | 2 +- src/{ => api}/simple.coffee | 61 ++++++++--- src/{ => api}/stream.coffee | 65 +++++++++--- src/cursor.coffee | 8 +- src/record.coffee | 16 ++- src/sampler.coffee | 4 +- src/store/file.coffee | 127 +++++++++++++++++----- src/store/file.js | 204 ------------------------------------ src/store/memory.coffee | 38 +++++-- test/main.coffee | 107 +++++++++++-------- test/test.json | 1 + 25 files changed, 636 insertions(+), 487 deletions(-) create mode 100644 examples/playback.coffee delete mode 100644 examples/test.sample rename lib/{ => api}/simple.js (65%) rename lib/{ => api}/stream.js (63%) delete mode 100644 lib/store/yaml.js rename src/{ => api}/simple.coffee (76%) rename src/{ => api}/stream.coffee (73%) delete mode 100644 src/store/file.js create mode 100644 test/test.json diff --git a/.gitignore b/.gitignore index fc471df..90028f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +test/tmp.* examples/test.json examples/test.yaml examples/test.yml diff --git a/.npmignore b/.npmignore index fc471df..90028f0 100644 --- a/.npmignore +++ b/.npmignore @@ -1,3 +1,4 @@ +test/tmp.* examples/test.json examples/test.yaml examples/test.yml diff --git a/README.md b/README.md index 8a615d0..9d7e6bd 100644 --- a/README.md +++ b/README.md @@ -117,9 +117,16 @@ recorder.write "hello" recorder.write foo: "hello", bar: "world" recorder.write new Buffer() -# in the future, you will be able to add an event at a specific time +# not yet implemented, but soon you will be able to add an event at a specific time # recorder.writeAt moment(1982,1,1), "cold wave" +# also, don't forget to close the recorder when you don't use it anymore +# the reason is that a recorder start some background processes +# (eg. async synchronization of database) that need to be stopped manually +# if there is not more data to record. +recorder.stop() + + ``` Playback @@ -152,6 +159,8 @@ recorder = new StreamRecorder record myInputStream.pipe(recorder) # that's all folks! +# you don't need to close explicitely the StreamRecorder (unlike SimpleRecorder) +# since it can detect automatically 'close' events from input stream ``` @@ -192,8 +201,13 @@ Piping ### Playing with Twitter Stream - Here I am using some environment variables to define the Twitter tokens (à la Heroku), - so don't forget to change this to fit your own environment. + NOTE 1: you need to install ntwitter manually before running the example: + + $ npm install -g ntwitter + + I didn't include it as a dependency to keep dependencies light. + + NOTE 2: you need to have some some environment variables containing your Twitter tokens ``` coffeescript @@ -231,6 +245,7 @@ twit.stream 'statuses/sample', (stream) -> stream.on 'data', (data) -> timeline.write moment(data.created_at), data.text delay duration*1000, -> + recorder.close() log "playing tweets back" new sampler.SimplePlayer timeline, speed: 2.0 @@ -246,6 +261,13 @@ twit.stream 'statuses/sample', (stream) -> ## Changelog +### 0.0.5 + + * now we can load a json file! and it's tested! + * more bugfixes + * more tests + * addd a recorder.close() function + ### 0.0.4 * Fixed broken YAML dependency diff --git a/examples/playback.coffee b/examples/playback.coffee new file mode 100644 index 0000000..551043a --- /dev/null +++ b/examples/playback.coffee @@ -0,0 +1,12 @@ +#!/usr/bin/env coffee + +# standard node library +{log,inspect} = require 'util' + +# sampler modules +sampler = require '../lib/sampler' +delay = (t, f) -> setTimeout f, t + +new sampler.SimplePlayer "file://twitter.smp", + speed: 2.0 + onData: (data) -> log "TWEET: #{data}" diff --git a/examples/test.sample b/examples/test.sample deleted file mode 100644 index 6c6addadca97f1ab2dc4387e338d3c578648e117..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 413 zcmWlV-AW@t5QS$@A&=n2E?d(I-bBg#wI)bFmjnVX33#K1D4j_;-A<-EPS<1zAy49s zkKvo_D|oZ8k6?ROKi@gUsegN)TZU`YWLa-I4db}m4ZA$)L}}9Avdok#p*Iqjs|DJE z^_Wy3O#_+GFarPZ2W(8ucC+@+%a6Eo?j+A5!?`ivY-5@LNGc1BupnTrazIg$e62-+ z{l4d=e!oL1E3jg{(>RI~{^WfmoeSsFpdg8rB|*w)*4r||a^+mlIcu|8D{}Ra?RL?d z*GhPJ@V8ZK@*nq;h)=!!H1vP9RF(OY_e=_{ap~}*;dt!CrOjP)ITuymg#js)vzfK0 zU4ESM$laz!(ZSRIJN8mr0xKFQH?Tml0F5e_b5prQ%R=cKl>XjXsu1Z6Ms(uBqN)~W zj!*=nlM-^HwSY`wR)8CsniyYuEgq)P+LI6CKi8uHTusKmuWs%qw}T)EgTTFXw)+A{ CXMhy| diff --git a/examples/twitter.coffee b/examples/twitter.coffee index 5ad4b99..0574d0d 100644 --- a/examples/twitter.coffee +++ b/examples/twitter.coffee @@ -12,7 +12,7 @@ sampler = require '../lib/sampler' delay = (t, f) -> setTimeout f, t # PARAMETERS -duration = 15 +duration = 8 timeline = new sampler.Record "file://twitter.smp" twit = new Twitter consumer_key: process.env.TWITTER_CONSUMER_KEY @@ -26,12 +26,16 @@ twit.stream 'statuses/sample', (stream) -> recorder = new sampler.SimpleRecorder timeline stream.on 'error', (err) -> log "twitter error: #{inspect err}" + if err.code? + log "this is a serious error. exiting" + process.exit() + if err.text? timeline.write moment(err.created_at), err.text stream.on 'data', (data) -> timeline.write moment(data.created_at), data.text delay duration*1000, -> - log "recording terminated" + log "recording terminated, will soon exit.." stream.destroy() # clean exit? - #process.exit() # problem; is we exit during a write(), everything in the file is lost + delay 5000, -> process.exit() log "listening for #{duration} seconds" \ No newline at end of file diff --git a/lib/simple.js b/lib/api/simple.js similarity index 65% rename from lib/simple.js rename to lib/api/simple.js index 3b4c43c..051eab8 100644 --- a/lib/simple.js +++ b/lib/api/simple.js @@ -9,25 +9,42 @@ moment = require('moment'); - _ref1 = require('./misc/toolbox'), delay = _ref1.delay, contains = _ref1.contains, simpleFactory = _ref1.simpleFactory; + _ref1 = require('../misc/toolbox'), delay = _ref1.delay, contains = _ref1.contains, simpleFactory = _ref1.simpleFactory; - Record = require('./record'); + Record = require('../record'); - Cursor = require('./cursor'); + Cursor = require('../cursor'); exports.Recorder = (function() { - function Recorder(url) { - var _this = this; + function Recorder(url, options) { + var k, v, + _this = this; if (url == null) { - url = false; + url = ""; } + if (options == null) { + options = {}; + } + this.close = __bind(this.close, this); + this.writeAt = __bind(this.writeAt, this); this.write = __bind(this.write, this); + this.autosave = __bind(this.autosave, this); + + this.config = { + autosave: 500 + }; + for (k in options) { + v = options[k]; + this.config[k] = v; + } this.callbacks = []; this.sync = 0; + this.synced = false; + this.closed = false; this.record = simpleFactory(Record, url); this.record.on('error', function(data) { var cb, _i, _len, _ref2; @@ -45,8 +62,9 @@ return 1; } }); - this.record.on('flushed', function(version) { + this.record.on('synced', function(version) { var cb, _i, _len, _ref2; + _this.synced = version > 0; if (version > _this.sync) { _this.sync = version; _ref2 = _this.callbacks; @@ -61,16 +79,41 @@ return 1; } }); + if (this.config.autosave >= 0) { + if (this.record.ready()) { + delay(this.config.autosave, function() { + return _this.autosave(); + }); + } else { + this.record.on('ready', function() { + return _this.autosave(); + }); + } + } } + Recorder.prototype.autosave = function() { + var _this = this; + if (this.synced) { + if (this.closed) { + this.config.autosave = -1; + return; + } + } else { + this.record.sync(); + } + if (this.config.autosave >= 0) { + return delay(this.config.autosave, function() { + return _this.autosave(); + }); + } + }; + Recorder.prototype.write = function(data, cb) { if (cb == null) { cb = false; } - if (cb) { - this.callbacks.push(cb); - } - return this.record.write(moment(), data); + return this.writeAt(moment(), data, cb); }; Recorder.prototype.writeAt = function(timestamp, data, cb) { @@ -80,7 +123,12 @@ if (cb) { this.callbacks.push(cb); } - return this.record.write(timestamp, data); + this.synced = this.record.write(timestamp, data); + return this.synced; + }; + + Recorder.prototype.close = function() { + return this.closed = true; }; return Recorder; @@ -90,14 +138,17 @@ exports.Player = (function() { function Player(url, options) { + var k, v, + _this = this; + if (options == null) { + options = {}; + } this.pause = __bind(this.pause, this); this.resume = __bind(this.resume, this); this.start = __bind(this.start, this); - var k, v, - _this = this; this.config = { speed: 1.0, autoplay: true, @@ -139,7 +190,15 @@ return _this.config.onError(err); }); if (this.config.autoplay) { - this.resume(); + if (this.record.ready()) { + delay(0, function() { + return _this.resume(); + }); + } else { + this.record.on('ready', function() { + return _this.resume(); + }); + } } } diff --git a/lib/stream.js b/lib/api/stream.js similarity index 63% rename from lib/stream.js rename to lib/api/stream.js index 210baf8..955ac9e 100644 --- a/lib/stream.js +++ b/lib/api/stream.js @@ -11,49 +11,86 @@ moment = require('moment'); - _ref1 = require('./misc/toolbox'), delay = _ref1.delay, contains = _ref1.contains, simpleFactory = _ref1.simpleFactory; + _ref1 = require('../misc/toolbox'), delay = _ref1.delay, contains = _ref1.contains, simpleFactory = _ref1.simpleFactory; - Record = require('./record'); + Record = require('../record'); - Cursor = require('./cursor'); + Cursor = require('../cursor'); exports.Recorder = (function(_super) { __extends(Recorder, _super); - function Recorder(url) { - var _this = this; + function Recorder(url, options) { + var k, v, + _this = this; if (url == null) { url = ""; } - this.writeAt = __bind(this.writeAt, this); - + if (options == null) { + options = {}; + } this.write = __bind(this.write, this); this.end = __bind(this.end, this); + this.autosave = __bind(this.autosave, this); + + this.config = { + autosave: 500 + }; + for (k in options) { + v = options[k]; + this.config[k] = v; + } + this.synced = false; + this.closed = false; + this.writable = true; this.record = simpleFactory(Record, url); this.record.on('error', function(version, err) { log("StreamRecorder: got error: " + err); _this.emit('error', err); }); - this.record.on('flushed', function(version) { - _this.emit('drain'); + this.record.on('synced', function(version) { + _this.synced = version > 0; }); - this.writable = true; + if (this.config.autosave >= 0) { + if (this.record.ready()) { + delay(this.config.autosave, function() { + return _this.autosave(); + }); + } else { + this.record.on('ready', function() { + return _this.autosave(); + }); + } + } } + Recorder.prototype.autosave = function() { + var _this = this; + if (this.synced) { + if (this.closed) { + this.config.autosave = -1; + return; + } + } else { + this.record.sync(); + } + if (this.config.autosave >= 0) { + return delay(this.config.autosave, function() { + return _this.autosave(); + }); + } + }; + Recorder.prototype.end = function(data) { + this.closed = true; return this.emit('close'); }; Recorder.prototype.write = function(data) { - this.record.write(moment(), data); - return true; - }; - - Recorder.prototype.writeAt = function(timestamp, data) { - this.record.write(timestamp, data); + this.synced = this.record.write(moment(), data); return true; }; @@ -66,12 +103,15 @@ __extends(Player, _super); function Player(url, options) { + var k, v, + _this = this; + if (options == null) { + options = {}; + } this.pause = __bind(this.pause, this); this.resume = __bind(this.resume, this); - var k, v, - _this = this; this.config = { speed: 1.0, autoplay: true, @@ -106,7 +146,15 @@ return _this.emit('error', err); }); if (this.config.autoplay) { - this.resume(); + if (this.record.ready()) { + delay(0, function() { + return _this.resume(); + }); + } else { + this.record.on('ready', function() { + return _this.resume(); + }); + } } } diff --git a/lib/cursor.js b/lib/cursor.js index 625a8fb..2c2b48a 100644 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -56,7 +56,7 @@ exports.prototype.checkBuffer = function() { if (this.buffer.length < this.bufferMax) { - return log("we have some room to bufferize"); + return 1; } }; @@ -68,7 +68,6 @@ return; } if (this.paused) { - log("cannot fire: paused"); return; } evt = this.next; @@ -80,7 +79,6 @@ return this.store.next(evt, function(next) { var dbLatency, realDelay, theoricDelay; if (!next) { - log("error, no more next in the DB.."); _this.emit('error', "store.next gave us nothing"); return; } diff --git a/lib/record.js b/lib/record.js index 3ea72d9..54646cc 100644 --- a/lib/record.js +++ b/lib/record.js @@ -25,13 +25,18 @@ if (url == null) { url = ""; } + if (options == null) { + options = {}; + } + this.sync = __bind(this.sync, this); + + this.ready = __bind(this.ready, this); + this.write = __bind(this.write, this); this.length = __bind(this.length, this); - this.config = { - autosave: 1000 - }; + this.config = {}; for (k in options) { v = options[k]; this.config[k] = v; @@ -42,12 +47,15 @@ this.store = new stores.File(path, this.config); } this.store.on('error', function(version, err) { - error("Record: @store sent us an error: " + err); + error("Record: " + err); return _this.emit('error', { version: version, err: err }); }); + this.store.on('ready', function() { + return _this.emit('ready'); + }); this.store.on('flushed', function(version) { return _this.emit('flushed'); }); @@ -61,6 +69,14 @@ return this.store.write(timestamp, data); }; + exports.prototype.ready = function() { + return this.store.initialized; + }; + + exports.prototype.sync = function() { + return this.store.sync(); + }; + return exports; })(events.EventEmitter); diff --git a/lib/sampler.js b/lib/sampler.js index 5182daa..5a77456 100644 --- a/lib/sampler.js +++ b/lib/sampler.js @@ -2,9 +2,9 @@ (function() { var Record, simple, stream; - stream = require('./stream'); + stream = require('./api/stream'); - simple = require('./simple'); + simple = require('./api/simple'); Record = require('./record'); diff --git a/lib/store/file.js b/lib/store/file.js index ab7f32b..00f835c 100644 --- a/lib/store/file.js +++ b/lib/store/file.js @@ -84,24 +84,28 @@ var k, v, _this = this; this.path = path; + if (options == null) { + options = {}; + } this._writeEvent = __bind(this._writeEvent, this); - this.save = __bind(this.save, this); - - this.autosave = __bind(this.autosave, this); + this.sync = __bind(this.sync, this); - exports.__super__.constructor.call(this); this.config = { - autosave: 500, filename: function() {} }; for (k in options) { v = options[k]; this.config[k] = v; } + this.events = []; + this.first = false; + this.last = false; + this._length = 0; this.buff = []; this.buffMax = 1; this.isWriting = false; + this.initialized = false; this.flushing = { version: 1, saved: 0 @@ -109,11 +113,10 @@ this.format = getFormat(this.path); switch (this.format) { case "YAML": - log("using YAML"); this.saveSnapshot = YAML.writeFile; + this.loadSnapshot = YAML.readFile; break; case "JSON": - log("using JSON"); this.saveSnapshot = function(path, data, cb) { var dumpString; dumpString = JSON.stringify(data); @@ -121,45 +124,90 @@ return cb(err); }); }; + this.loadSnapshot = function(path, cb) { + return fs.readFile(path, function(err, data) { + var obj; + obj = {}; + if (!err) { + try { + obj = JSON.parse(data); + } catch (exc) { + err = "could not load json: " + exc; + } + } + return cb(err, obj); + }); + }; break; case "SAMPLER": this.saveSnapshot = function(path, data, cb) { - var compressed, dumpString; - dumpString = JSON.stringify(data); - compressed = snappy.compressSync(dumpString); + var compressed; + compressed = snappy.compressSync(data); return fs.writeFile(path, compressed, function(err) { return cb(err); }); }; + this.loadSnapshot = function(path, cb) { + return fs.readFile(path, function(err, raw) { + var data, obj; + obj = {}; + if (!err) { + data = snappy.decompressSync(raw, snappy.parsers.string); + if (data) { + try { + obj = JSON.parse(data); + } catch (exc) { + err = "invalid json file: " + exc; + } + } + } + return cb(err, obj); + }); + }; break; default: log("unknow format: " + this.format); throw "unknow format: " + this.format; return; } - delay(0, function() { - return _this.autosave(); + delay(1, function() { + return _this.load(); }); } - exports.prototype._load = function(path, cb) { - if (cb == null) { - cb = function() {}; - } - }; - - exports.prototype.autosave = function() { + exports.prototype.load = function() { var _this = this; - this.save(); - return delay(this.config.autosave, function() { - return _this.autosave(); + return this.loadSnapshot(this.path, function(err, data) { + var event, _i, _len, _ref2; + if (err) { + 1; + + } + if (data != null) { + if (data.events != null) { + if (data.events.length > 0) { + _this.events = []; + _ref2 = data.events; + for (_i = 0, _len = _ref2.length; _i < _len; _i++) { + event = _ref2[_i]; + _this.write(moment(event[0]), event[1]); + } + } + } + } + return _this.ready(); }); }; - exports.prototype.save = function() { + exports.prototype.sync = function() { var event, snapshot, version, _i, _len, _ref2, _this = this; + if (!this.initialized) { + this.emit('synced', -1); + return; + } if (this.isWriting) { + this.emit('synced', -1); return; } this.isWriting = true; @@ -172,24 +220,24 @@ event = _ref2[_i]; snapshot.events.push([0 + event.timestamp, event.data]); } - version = 0 + this.flushing.version; - this.flushing.version++; - this.saveSnapshot(this.path, snapshot, function(err) { + version = this.count; + return this.saveSnapshot(this.path, snapshot, function(err) { _this.flushing.saved = version; _this.isWriting = false; if (err) { + _this.emit('synced', -1); return _this.emit('error', err); } else { - return _this.emit('flushed', version); + return _this.emit('synced', version); } }); - return false; }; exports.prototype._writeEvent = function(event) { this.events.push(event); this.buff.push(event); - return true; + this.count(); + return false; }; return exports; diff --git a/lib/store/memory.js b/lib/store/memory.js index 42eeb0f..f9b2f95 100644 --- a/lib/store/memory.js +++ b/lib/store/memory.js @@ -23,14 +23,30 @@ __extends(exports, _super); - function exports(config) { - this.config = config; + function exports(options) { + var k, v, + _this = this; + if (options == null) { + options = {}; + } + this.sync = __bind(this.sync, this); + + this.count = __bind(this.count, this); + this.length = __bind(this.length, this); this._writeEvent = __bind(this._writeEvent, this); this.write = __bind(this.write, this); + this.ready = __bind(this.ready, this); + + this.config = {}; + for (k in options) { + v = options[k]; + this.config[k] = v; + } + this.initialized = false; this.events = []; this.first = false; this.last = false; @@ -38,8 +54,17 @@ this.flushing = { version: 1 }; + this.events = []; + delay(0, function() { + return _this.ready(); + }); } + exports.prototype.ready = function() { + this.initialized = true; + return this.emit('ready'); + }; + exports.prototype.write = function(timestamp, data) { var event; event = { @@ -63,6 +88,7 @@ exports.prototype._writeEvent = function(event) { this.events.push(event); + this.count(); return true; }; @@ -82,6 +108,17 @@ return this._length; }; + exports.prototype.count = function() { + var version; + version = 0 + this.flushing.version; + this.flushing.version++; + return version; + }; + + exports.prototype.sync = function() { + return this.emit('flushed', this.count()); + }; + return exports; })(events.EventEmitter); diff --git a/lib/store/yaml.js b/lib/store/yaml.js deleted file mode 100644 index 704f548..0000000 --- a/lib/store/yaml.js +++ /dev/null @@ -1,84 +0,0 @@ -// Generated by CoffeeScript 1.3.3 -(function() { - var BinaryTree, InMemory, Stream, YAML, contains, delay, error, events, inspect, log, moment, _ref, _ref1, - __bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }, - __hasProp = {}.hasOwnProperty, - __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }; - - _ref = require('util'), log = _ref.log, error = _ref.error, inspect = _ref.inspect; - - Stream = require('stream').Stream; - - events = require('events'); - - moment = require('moment'); - - YAML = require('libyaml'); - - _ref1 = require('../misc/toolbox'), delay = _ref1.delay, contains = _ref1.contains; - - BinaryTree = require('../misc/btree'); - - InMemory = require('./memory'); - - module.exports = (function(_super) { - - __extends(exports, _super); - - function exports(url) { - var buff, buffMax; - this.url = url; - this._writeEvent = __bind(this._writeEvent, this); - - exports.__super__.constructor.call(this); - buff = []; - buffMax = 3; - } - - exports.prototype._load = function(url, cb) { - var _this = this; - if (cb == null) { - cb = function() {}; - } - return YAML.readFile(path, function(err, obj) { - if (err != null) { - return cb(err, {}); - } else { - return cb(false, obj[0]); - } - }); - }; - - exports.prototype._writeEvent = function(event, cb) { - var snapshot, - _this = this; - this.events.push(event); - this.buff.push(event); - if (this.buff.length >= this.buffMax) { - this.buff = []; - snapshot = { - first: this.first, - last: this.last, - records: this.records - }; - YAML.writeFile(this.url, snapshot, function(err) { - if (err) { - log("store.YAML: _writeEvent: could not write events to disk.."); - cb(err); - return _this.emit('error', err); - } else { - log("store.YAML: _writeEvent: events wrote to disk! sending drain.."); - return _this.emit('drain'); - } - }); - return false; - } else { - return true; - } - }; - - return exports; - - })(InMemory); - -}).call(this); diff --git a/package.json b/package.json index 5f04c14..80eca96 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name" : "sampler", "description" : "Record things, play them back", - "version" : "0.0.4", + "version" : "0.0.5", "repository" : { "type": "git", "url": "git://github.com/daizoru/node-sampler.git" }, "license" : "BSD", "author" : { "name" : "Julian Bilcke", "email" : "julian.bilcke@daizoru.com", "url" : "http://github.com/daizoru" }, diff --git a/src/simple.coffee b/src/api/simple.coffee similarity index 76% rename from src/simple.coffee rename to src/api/simple.coffee index 5428c93..647a79d 100644 --- a/src/simple.coffee +++ b/src/api/simple.coffee @@ -32,17 +32,24 @@ moment = require 'moment' # project modules -{delay,contains,simpleFactory} = require './misc/toolbox' -Record = require './record' -Cursor = require './cursor' +{delay,contains,simpleFactory} = require '../misc/toolbox' +Record = require '../record' +Cursor = require '../cursor' # SIMPLE API class exports.Recorder - constructor: (url=no) -> + constructor: (url="",options={}) -> + @config = + autosave: 500 + + for k,v of options + @config[k] = v # Simple API use simple callbacks @callbacks = [] @sync = 0 + @synced = no + @closed = no #log "SimpleRecorder#constructor(#{url})" @record = simpleFactory Record, url @@ -56,7 +63,8 @@ class exports.Recorder else 1 # the event arrived to late - we just ignore it - @record.on 'flushed', (version) => + @record.on 'synced', (version) => + @synced = (version > 0) if version > @sync @sync = version for cb in @callbacks @@ -65,22 +73,42 @@ class exports.Recorder else 1 # the event arrived to late - we just ignore it + if @config.autosave >= 0 + if @record.ready() + delay @config.autosave, => @autosave() + else + @record.on 'ready', => + @autosave() + + autosave: => + #log "AUTOSAVE" + if @synced + # if we are closed AND synced -> stop loop + if @closed + @config.autosave = -1 + return + else + @record.sync() + if @config.autosave >= 0 + delay @config.autosave, => + @autosave() + # SimpleRecorder API write: (data, cb=no) => - #log "SimpleRecorder#write(#{data})" - #log "Record: write()" - @callbacks.push cb if cb - @record.write moment(), data + @writeAt moment(), data, cb # SimpleRecorder API writeAt: (timestamp, data, cb=no) => - #log "SimpleRecorder#writeAt(#{timestamp},#{data})" - #log "Record: write()" @callbacks.push cb if cb - @record.write timestamp, data + @synced = @record.write timestamp, data + @synced + + close: => + @closed = yes + class exports.Player - constructor: (url, options) -> + constructor: (url, options={}) -> #log "simple.Player#constructor(#{url}, options)" @config = @@ -116,7 +144,12 @@ class exports.Player @cursor.on 'end', => @config.onEnd() @cursor.on 'error', (err) => @config.onError err - @resume() if @config.autoplay + if @config.autoplay + if @record.ready() + delay 0, => @resume() + else + @record.on 'ready', => + @resume() start: => @resume() resume: => @cursor.resume() diff --git a/src/stream.coffee b/src/api/stream.coffee similarity index 73% rename from src/stream.coffee rename to src/api/stream.coffee index 5f8f795..43672c5 100644 --- a/src/stream.coffee +++ b/src/api/stream.coffee @@ -32,13 +32,23 @@ moment = require 'moment' # project modules -{delay,contains,simpleFactory} = require './misc/toolbox' -Record = require './record' -Cursor = require './cursor' +{delay,contains,simpleFactory} = require '../misc/toolbox' +Record = require '../record' +Cursor = require '../cursor' # STREAMING API class exports.Recorder extends Stream - constructor: (url="") -> + constructor: (url="",options={}) -> + @config = + autosave: 500 + for k,v of options + @config[k] = v + + @synced = no + @closed = no + + @writable = yes + #log "StreamRecorder#constructor(#{url})" @record = simpleFactory Record, url @@ -48,32 +58,49 @@ class exports.Recorder extends Stream @emit 'error', err return - @record.on 'flushed', (version) => + @record.on 'synced', (version) => #log "StreamRecorder: disk flushed, emitting'drain'" - @emit 'drain' + @synced = (version > 0) # we are synced if + #@emit 'drain' # not yet return - @writable = yes + + if @config.autosave >= 0 + if @record.ready() + delay @config.autosave, => @autosave() + else + @record.on 'ready', => + @autosave() + + + autosave: => + #log "AUTOSAVE" + if @synced + # if we are closed AND synced -> stop loop + if @closed + @config.autosave = -1 + return + else + @record.sync() + if @config.autosave >= 0 + delay @config.autosave, => + @autosave() end: (data) => #log "StreamRecorder#end(#{inspect data})" + @closed = yes @emit 'close' # optional #@record.close() # SimpleRecorder API write: (data) => - #log "StreamRecorder#write(#{inspect data})" - @record.write moment(), data - yes + @synced = @record.write moment(), data - # SimpleRecorder API - writeAt: (timestamp, data) => - #log "StreamRecorder#writeAt(#{timestamp},#{inspect data})" - @record.write timestamp, data + # even iif we are not synced to disk, we let the input stream fill our buffer yes class exports.Player extends Stream - constructor: (url, options) -> + constructor: (url, options={}) -> #log "StreamPlayer#constructor(#{url})" @config = speed: 1.0 @@ -106,8 +133,12 @@ class exports.Player extends Stream #log "CURSOR SENT 'error': #{err}" @emit 'error', err - @resume() if @config.autoplay - + if @config.autoplay + if @record.ready() + delay 0, => @resume() + else + @record.on 'ready', => + @resume() resume: => #log "StreamPlayer#resume(): checking.." diff --git a/src/cursor.coffee b/src/cursor.coffee index ed05469..fa79a13 100644 --- a/src/cursor.coffee +++ b/src/cursor.coffee @@ -72,13 +72,15 @@ class module.exports extends events.EventEmitter # du we already have a running cursor or not? @next = @store.first unless @next + #log "EMIT BEGIN" @emit 'begin' @fire() checkBuffer: => if @buffer.length < @bufferMax # we need to fill it - log "we have some room to bufferize" + #log "we have some room to bufferize" + 1 fire: => @@ -89,7 +91,7 @@ class module.exports extends events.EventEmitter return if @paused - log "cannot fire: paused" + #log "cannot fire: paused" return evt = @next @@ -99,7 +101,7 @@ class module.exports extends events.EventEmitter @store.next evt, (next) => unless next - log "error, no more next in the DB.." + #log "error, no more next in the DB.." @emit 'error', "store.next gave us nothing" return diff --git a/src/record.coffee b/src/record.coffee index c78b861..ac92074 100644 --- a/src/record.coffee +++ b/src/record.coffee @@ -37,10 +37,8 @@ stores = require './stores' class module.exports extends events.EventEmitter - constructor: (url="", options) -> - @config = - autosave: 1000 - + constructor: (url="", options={}) -> + @config = {} for k,v of options @config[k]=v @@ -56,9 +54,12 @@ class module.exports extends events.EventEmitter # @store.on 'error', (version, err) => - error "Record: @store sent us an error: #{err}" + error "Record: #{err}" @emit 'error', {version: version, err: err} + @store.on 'ready', => + @emit 'ready' + # called whenever the store flushed a snapshot to disk # flushed version is passed in argument @store.on 'flushed', (version) => @@ -74,3 +75,8 @@ class module.exports extends events.EventEmitter write: (timestamp, data) => @store.write timestamp, data + ready: => + @store.initialized + sync: => + #log "RECORD: SYNC" + @store.sync() diff --git a/src/sampler.coffee b/src/sampler.coffee index 5da6f94..6c3abcd 100644 --- a/src/sampler.coffee +++ b/src/sampler.coffee @@ -1,5 +1,5 @@ -stream = require './stream' -simple = require './simple' +stream = require './api/stream' +simple = require './api/simple' Record = require './record' exports.Record = Record diff --git a/src/store/file.coffee b/src/store/file.coffee index 952181f..203b005 100644 --- a/src/store/file.coffee +++ b/src/store/file.coffee @@ -86,19 +86,26 @@ the file)." (from http://book.mixu.net/ch9.html) # TODO emit errors when there are.. errors class module.exports extends Memory - constructor: (@path, options) -> - super() + constructor: (@path, options={}) -> + + # copied from memory @config = - autosave: 500 filename: -> for k,v of options @config[k]=v + @events = [] + @first = no + @last = no + @_length = 0 + + # file version start here + @buff = [] - @buffMax = 1 # for the moment we auto-save every single event + @buffMax = 1 @isWriting = no - + @initialized = no @flushing = version: 1 saved: 0 @@ -107,44 +114,103 @@ class module.exports extends Memory @format = getFormat @path switch @format when "YAML" - log "using YAML" + #log "using YAML" @saveSnapshot = YAML.writeFile + @loadSnapshot = YAML.readFile when "JSON" - log "using JSON" + #log "using JSON" @saveSnapshot = (path, data, cb) => dumpString = JSON.stringify data - fs.writeFile path, dumpString, (err) -> cb err + fs.writeFile path, dumpString, (err) => + cb err + @loadSnapshot = (path, cb) => + fs.readFile path, (err, data) => + obj = {} + unless err + try + obj = JSON.parse data + catch exc + err = "could not load json: #{exc}" + + cb err, obj + when "SAMPLER" @saveSnapshot = (path, data, cb) => #log "File: saving snapshot using Snappy" - dumpString = JSON.stringify data - compressed = snappy.compressSync dumpString - fs.writeFile path, compressed, (err) -> cb err + compressed = snappy.compressSync data + fs.writeFile path, compressed, (err) => + cb err + + @loadSnapshot = (path, cb) => + fs.readFile path, (err, raw) => + obj = {} + unless err + data = snappy.decompressSync raw, snappy.parsers.string + #log "data: #{data}" + if data + try + #log "GOING TO PARSE #{data}" + obj = JSON.parse data + catch exc + err = "invalid json file: #{exc}" + #log "obj: #{obj}" + cb err, obj else log "unknow format: #{@format}" throw "unknow format: #{@format}" return - - delay 0, => @autosave() + delay 1, => @load() - # async load - the stream will resume once the file is loaded - _load: (path, cb=->) -> - # use YAML.stream.parse - - autosave: => - @save() - delay @config.autosave, => - @autosave() - save: => + # async load - the stream will resume once the file is loaded + load: -> + #log "LOADING FILE.." + @loadSnapshot @path, (err, data) => + + if err + + #msg = "could not load '#{@path}': #{err}" + #error msg + #throw msg + #log "file empty?" + 1 + + + if data? + #log "got data: #{data}" + if data.events? + #log "got events" + if data.events.length > 0 + #log "Loading events.." + #log "loading #{data.events}" + + # TODO: WARNING: we might already have some events in the memory + # for the moment we simply ignore previous entries + @events = [] + + for event in data.events + @write moment(event[0]), event[1] + #log "inspection: #{inspect @events}" + @ready() + + sync: => + #log "FILE: SYNC: POSSIBLE?" + unless @initialized + #log "cannot sync: file is not initialized. aborting" + @emit 'synced', -1 + # TODO: we should save later.. + return + if @isWriting + #log "cannot sync: file is already been synced. aborting" + @emit 'synced', -1 #log "CANNOT SAVE NOW - PLEASE TRY LATER" return - #log "SAVING" + #log "SYNC POSSIBLE" @isWriting = yes # serialize references @@ -154,6 +220,7 @@ class module.exports extends Memory #last: 0+@last.timestamp events: [] @buff = [] + for event in @events snapshot.events.push [ 0+event.timestamp @@ -163,22 +230,24 @@ class module.exports extends Memory event.data ] - version = 0+@flushing.version - @flushing.version++ #log "WRITING TO DISK VERSION #{version}: (#{@path}, #{snapshot})" - + version = @count @saveSnapshot @path, snapshot, (err) => @flushing.saved = version @isWriting = no if err #error "store.File: ERROR, COULD NOT WRITE TO FILE: #{err}" + @emit 'synced', -1 @emit 'error', err else - @emit 'flushed', version - no + #log "file synced to #{version}" + @emit 'synced', version + _writeEvent: (event) => + #log "WRITING EVENT" @events.push event @buff.push event - yes # tell the input stream not to wait for us + @count() + no diff --git a/src/store/file.js b/src/store/file.js deleted file mode 100644 index 419f512..0000000 --- a/src/store/file.js +++ /dev/null @@ -1,204 +0,0 @@ -// Generated by CoffeeScript 1.3.3 -(function() { - var BinaryTree, Memory, Stream, YAML, contains, delay, error, events, fs, getFormat, inspect, log, moment, snappy, _ref, _ref1, - __bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }, - __hasProp = {}.hasOwnProperty, - __extends = function(child, parent) { for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; }; - - _ref = require('util'), log = _ref.log, error = _ref.error, inspect = _ref.inspect; - - Stream = require('stream').Stream; - - events = require('events'); - - fs = require('fs'); - - moment = require('moment'); - - YAML = require('libyaml'); - - snappy = require('snappy'); - - _ref1 = require('../misc/toolbox'), delay = _ref1.delay, contains = _ref1.contains; - - BinaryTree = require('../misc/btree'); - - Memory = require('./memory'); - - getFormat = function(path) { - var _base; - if (!path) { - return "UNKNOW"; - } - switch (typeof path.split === "function" ? typeof (_base = path.split(".").slice(-1)[0]).toLowerCase === "function" ? _base.toLowerCase() : void 0 : void 0) { - case "js": - case "json": - return "JSON"; - case "yml": - case "yaml": - return "YAML"; - case "smp": - case "sample": - return "SAMPLER"; - default: - return "UNKNOW"; - } - }; - - /* TODO use async, streaming file reading: - - writeStream = fs.createWriteStream(__dirname + "/outFile.txt"); - // every time "data" is read, this event fires - readStream.on('data', function(textData) { - console.log("Found some text!"); - writeStream.write(textData); - }); - - // the reading is finished... - readStream.on('close', function () { - writeStream.end(); // ...close up the write, too! - console.log("I finished."); - }); - - - "Partially buffered access methods are different. - They do not treat data input as a discrete event, - but rather as a series of events which occur as - the data is being read or written. They allow us - to access data as it is being read from - disk/network/other I/O. - Partially buffered methods, such as readSync() - and read() allow us to specify the size of the - buffer, and read data in small chunks. They - allow for more control (e.g. reading a file in - non-linear order by skipping back and forth in - the file)." (from http://book.mixu.net/ch9.html) - */ - - - module.exports = (function(_super) { - - __extends(exports, _super); - - function exports(path, options) { - var k, v, - _this = this; - this.path = path; - this._writeEvent = __bind(this._writeEvent, this); - - this.save = __bind(this.save, this); - - this.autosave = __bind(this.autosave, this); - - exports.__super__.constructor.call(this); - this.config = { - autosave: 500, - filename: function() {} - }; - for (k in options) { - v = options[k]; - this.config[k] = v; - } - this.buff = []; - this.buffMax = 1; - this.isWriting = false; - this.flushing = { - version: 1, - saved: 0 - }; - this.format = getFormat(this.path); - switch (this.format) { - case "YAML": - log("using YAML"); - this.saveSnapshot = YAML.writeFile; - break; - case "JSON": - log("using JSON"); - this.saveSnapshot = function(path, data, cb) { - var dumpString; - dumpString = JSON.stringify(data); - return fs.writeFile(path, dumpString, function(err) { - return cb(err); - }); - }; - break; - case "SAMPLER": - this.saveSnapshot = function(path, data, cb) { - var compressed, dumpString; - dumpString = JSON.stringify(data); - compressed = snappy.compressSync(dumpString); - return fs.writeFile(path, compressed, function(err) { - return cb(err); - }); - }; - break; - default: - log("unknow format: " + this.format); - throw "unknow format: " + this.format; - return; - } - this.autosave(); - } - - exports.prototype._load = function(path, cb) { - if (cb == null) { - cb = function() {}; - } - }; - - exports.prototype.autosave = function() { - var _this = this; - log("AUTOSAVING.."); - delay(0, function() { - return _this.save(); - }); - return delay(this.config.autosave, function() { - return _this.autosave(); - }); - }; - - exports.prototype.save = function() { - var event, snapshot, version, _i, _len, _ref2, - _this = this; - if (this.isWriting) { - return; - } - this.isWriting = true; - snapshot = { - events: [] - }; - this.buff = []; - _ref2 = this.events; - for (_i = 0, _len = _ref2.length; _i < _len; _i++) { - event = _ref2[_i]; - snapshot.events.push([0 + event.timestamp, event.data]); - log("AUTOSAVED"); - } - version = 0 + this.flushing.version; - this.flushing.version++; - this.saveSnapshot(this.path, snapshot, function(err) { - _this.flushing.saved = version; - _this.isWriting = false; - if (err) { - return _this.emit('error', err); - } else { - return _this.emit('flushed', version); - } - }); - return false; - }; - - exports.prototype._writeEvent = function(event) { - var _this = this; - delay(0, function() { - _this.events.push(event); - return _this.buff.push(event); - }); - return true; - }; - - return exports; - - })(Memory); - -}).call(this); diff --git a/src/store/memory.coffee b/src/store/memory.coffee index 658fa23..bb8a7d2 100644 --- a/src/store/memory.coffee +++ b/src/store/memory.coffee @@ -39,14 +39,34 @@ BinaryTree = require '../misc/btree' # TODO emit errors when there are.. errors class module.exports extends events.EventEmitter - constructor: (@config) -> + constructor: (options={}) -> + + @config = {} + for k,v of options + @config[k] = v + + @initialized = no + @events = [] @first = no @last = no @_length = 0 + @flushing = version: 1 + @events = [] + + #log "MEMORY SCHEDULING READY" + delay 0, => + #log "READY!" + @ready() + + ready: => + #log "CALLED READY" + @initialized = yes + @emit 'ready' + write: (timestamp, data) => event = @@ -72,10 +92,7 @@ class module.exports extends events.EventEmitter _writeEvent: (event) => #log "store.Memory: _writeEvent: writing!" @events.push event - - # the memory store does NOT need to send 'flushed' events - #@emit 'flushed', @flushing.version++ - + @count() yes # Get the previous Event @@ -88,4 +105,13 @@ class module.exports extends events.EventEmitter # Compute the duration length: () => - @_length \ No newline at end of file + @_length + + count: () => + version = 0+@flushing.version + @flushing.version++ + version + + sync: => + #log "MEMORY: SYNC" + @emit 'flushed', @count() \ No newline at end of file diff --git a/test/main.coffee b/test/main.coffee index 28c4320..781c50e 100644 --- a/test/main.coffee +++ b/test/main.coffee @@ -2,6 +2,9 @@ # Standard Node Library {log,error,inspect} = require 'util' {Stream} = require 'stream' +fs = require 'fs' + +# thirs party libs moment = require 'moment' # helper functions from our app @@ -68,45 +71,61 @@ class Newsfeed extends Stream TIMEOUT = 50 # Not good, we have a latency of 50~60ms + # our tests -describe 'new Record(\'test.sample\')', -> +describe 'new Record(\'test/tmp.json\')', -> # our tests - describe 'and Simple API', -> - - record = new Record 'file://examples/test.sample' - length = 0 - it 'record some events in about 100ms', (done) -> - #@timeout 10000 - recorder = new SimpleRecorder record - feed = new Newsfeed() - - t = moment() - feed.runFirst (event) -> - e = moment() - t - - #log "runFirst (event): elapsed: #{e}" - - if event - #log "event" - recorder.write event - else - length = record.length() - #log "ENDED RECORD. length: #{length}" - done() - - it 'playback at normal speed', (done) -> - t = moment() - @timeout 3000 - new SimplePlayer record, - onBegin: => - #log "stream started. timeout set to 30 + #{TIMEOUT + (length / 1.0)}" - @timeout (30 + TIMEOUT + (length / 1.0)) - onEnd: => + describe 'using Simple API', -> + + fs.unlink 'test/tmp.json', (err) -> + unless err + #log "removed previous test file" + 0 + record = new Record 'file://test/tmp.json' + length = 0 + + it 'should record some events in about 100ms', (done) -> + #@timeout 10000 + recorder = new SimpleRecorder record + feed = new Newsfeed() + + t = moment() + feed.runFirst (event) -> e = moment() - t - #log "play expected: 30 + #{TIMEOUT + (length / 1.0)}; elapsed: #{e}" - done() - + #log "runFirst (event): elapsed: #{e}" + + if event + #log "event" + recorder.write event + else + length = record.length() + #log "ENDED RECORD. length: #{length}" + recorder.close() + done() + + it 'should playback at normal speed', (done) -> + t = moment() + new SimplePlayer record, + onBegin: => + #log "stream started. timeout set to 30 + #{TIMEOUT + (length / 1.0)}" + @timeout (30 + TIMEOUT + (length / 1.0)) + onEnd: => + e = moment() - t + #log "play expected: 30 + #{TIMEOUT + (length / 1.0)}; elapsed: #{e}" + done() + + it 'should load an existing demo file', (done) -> + t = moment() + record = new Record "file://test/test.json" + new SimplePlayer record, + onBegin: => + #log "stream started. timeout set to 30 + #{TIMEOUT + (length / 1.0)}" + @timeout (30 + TIMEOUT + (record.length() / 1.0)) + onEnd: => + e = moment() - t + #log "play expected: 30 + #{TIMEOUT + (length / 1.0)}; elapsed: #{e}" + done() # our tests describe 'new Record()', -> @@ -123,37 +142,40 @@ describe 'new Record()', -> recorder.write event else length = record.length() + recorder.close() done() it 'playback at normal speed', (done) -> - @timeout 70 + (length / 1.0) - new SimplePlayer record, onEnd: -> done() + new SimplePlayer record, + onBegin: => @timeout 70 + (length / 1.0) + onEnd: -> done() it 'playback at 2.0x speed', (done) -> - @timeout 40 + (length / 2.0) + new SimplePlayer record, speed: 2.0 - onEnd: -> done() + onBegin: => @timeout 40 + (length / 2.0) + onEnd: -> done() # looks like increasing speed reduce latency... WTF? # maybe this is bacause of the latency reduction it 'playback at 10.0x speed', (done) -> - @timeout 20 + (length / 10.0) new SimplePlayer record, speed: 10.0 + onBegin: => @timeout 20 + (length / 10.0) onEnd: -> done() it 'playback at 0.345x speed', (done) -> - @timeout 170 + (length / 0.345) #log "timeout set to 160 + #{(length / 0.345)}" t = moment() new SimplePlayer record, speed: 0.345 + onBegin: => + @timeout 170 + (length / 0.345) onEnd: -> #log "ELAPSED: #{moment() - t}" done() - # our tests describe 'Stream API', -> @@ -196,3 +218,4 @@ describe 'Stream API', -> @timeout 20 + (length / 10.0) player = new StreamPlayer record, speed: 10.0 player.on 'end', -> done() + diff --git a/test/test.json b/test/test.json new file mode 100644 index 0000000..0e86ae2 --- /dev/null +++ b/test/test.json @@ -0,0 +1 @@ +{"events":[[1340012175491,{"companyhelpdesk":"hi how can I help you"}],[1340012175542,{"facebook":"wow! this was a big earthquake"}],[1340012175595,{"ticker":-3234}],[1340012175646,{"weatherstation":{"temp":"76","unit":"F"}}],[1340012175697,{"counter":42}],[1340012175748,{"irc":{"channel":"#FOO","msg":" ho hai"}}],[1340012175799,{"twitter":"just saw my dead neighbor walking in my street. It's weird. wait I'm gonna check it out"}],[1340012175850,{"twitter":"ZOMBIE APOCALYPSE!!1!!"}]]} \ No newline at end of file