Skip to content
This repository has been archived by the owner on Sep 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #87 from vaultec81/master
Browse files Browse the repository at this point in the history
Add oplog events support
  • Loading branch information
aphelionz authored May 11, 2020
2 parents 6f0214b + f2e5bd9 commit 5686a3d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 8 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ console.log(db.replicationStatus)
```javascript
db.events.on('write', (id, hash, entry) => ... )
```
- `log.op.${operation}` - (entry)

Emitted after an entry was added to the database regardless of whether the entry is added remotely, or locally. `${operation}` is replaced with a specified oplog operation. `none` is specified to listen for a oplog entry without an operation specified. The supported operations are diagrammed in the entry payload.
```javascript
db.events.on('log.op.ADD', (id, hash, payload) => ... )
```

#### Private methods

Expand Down
28 changes: 20 additions & 8 deletions src/Store.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class Store {
} catch (e) {
console.error('Store Error:', e)
}
this.events.on("replicated.progress", (address, hash, entry, progress, have) => {
this._procEntry(entry);
});
this.events.on("write", (address, entry, heads) => {
this._procEntry(entry);
});
}

get all () {
Expand Down Expand Up @@ -188,14 +194,9 @@ class Store {
}

// Remove all event listeners
this.events.removeAllListeners('load')
this.events.removeAllListeners('load.progress')
this.events.removeAllListeners('replicate')
this.events.removeAllListeners('replicate.progress')
this.events.removeAllListeners('replicated')
this.events.removeAllListeners('ready')
this.events.removeAllListeners('write')
this.events.removeAllListeners('peer')
for(var event in this.events._events) {
this.events.removeAllListeners(event);
}

// Database is now closed
// TODO: afaik we don't use 'closed' event anymore,
Expand Down Expand Up @@ -528,6 +529,17 @@ class Store {
_addOperationBatch (data, batchOperation, lastOperation, onProgressCallback) {
throw new Error('Not implemented!')
}

_procEntry(entry) {
var { payload, hash } = entry;
var { op } = payload;
if(op) {
this.events.emit(`log.op.${op}`, this.address.toString(), hash, payload);
} else {
this.events.emit(`log.op.none`, this.address.toString(), hash, payload);
}
this.events.emit('log.op', op, this.address.toString(), hash, payload)
}

_onLoadProgress (hash, entry, progress, total) {
this._recalculateReplicationStatus(progress, total)
Expand Down
76 changes: 76 additions & 0 deletions test/events.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
'use strict'

var assert = require('assert')
const Store = require('../src/Store')

const Cache = require('orbit-db-cache')
const Keystore = require('orbit-db-keystore')
const IdentityProvider = require('orbit-db-identity-provider')
const DefaultOptions = Store.DefaultOptions

// Test utils
const {
config,
testAPIs,
startIpfs,
stopIpfs,
implementations
} = require('orbit-db-test-utils')

const properLevelModule = implementations.filter(i => i.key.indexOf('memdown') > -1).map(i => i.module)[0]
const storage = require('orbit-db-storage-adapter')(properLevelModule)

Object.keys(testAPIs).forEach((IPFS) => {
describe(`Events ${IPFS}`, function () {
let ipfs, testIdentity, identityStore, store, cacheStore

this.timeout(config.timeout)

const ipfsConfig = Object.assign({}, config.defaultIpfsConfig, {
repo: config.defaultIpfsConfig.repo + '-entry' + new Date().getTime()
})
after(async () => {
await store.close()
await stopIpfs(ipfs)
await identityStore.close()
await cacheStore.close()
})

afterEach(async () => {
await store.drop()
await cacheStore.open()
await identityStore.open()
})
before(async () => {
identityStore = await storage.createStore('identity')
const keystore = new Keystore(identityStore)

cacheStore = await storage.createStore('cache')
const cache = new Cache(cacheStore)

testIdentity = await IdentityProvider.createIdentity({ id: 'userA', keystore })
ipfs = await startIpfs(IPFS, ipfsConfig)

const address = 'test-address'
const options = Object.assign({}, DefaultOptions, { cache })
store = new Store(ipfs, testIdentity, address, options)
})
it('Specific log.op event', (done) => {
var data = {
op: "SET",
key: "transaction",
value: "data"
}
store.events.on("log.op.SET", (id, address, payload) => {
var {op, key, value} = payload;
assert.strictEqual(op, data.op);
assert.strictEqual(key, data.key);
assert.strictEqual(value, data.value);
assert.strictEqual(id, 'test-address');
done();
})
store._addOperation(data)

})
})
})

0 comments on commit 5686a3d

Please sign in to comment.