Skip to content

Commit

Permalink
Dual writing
Browse files Browse the repository at this point in the history
  • Loading branch information
sergionoviello committed Jun 5, 2024
1 parent f9e8dce commit 0973b39
Show file tree
Hide file tree
Showing 5 changed files with 2,247 additions and 926 deletions.
7 changes: 7 additions & 0 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ Connection.prototype.teardown = function (callback) {
this.subscriber = null
}

if (this.replicaClient) {
if (this.replicaClient.connected) {
this.replicaClient.quit(tracker('quit replica client :' + this.name))
}
this.replicaClient = null
}

tracker('client && subscriber checked')()
}
}
Expand Down
2 changes: 2 additions & 0 deletions lib/connection_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ConnectionHelper.parse = function (configuration) {
if (configuration.use_connection) {
name = configuration.use_connection
config = configuration.connection_settings && configuration.connection_settings[name]

if (!config) {
throw new Error('No connection_settings provided: ' + configuration + ' use_connection: ' + name)
}
Expand All @@ -35,6 +36,7 @@ ConnectionHelper.connection = function (configuration) {
connection = new Connection(parsed.name, parsed.config)
connections[parsed.name] = connection
}

return connection
}

Expand Down
73 changes: 73 additions & 0 deletions lib/persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Persistence.connect = function (done) {
if (!connecting) {
connecting = true
connection = ConnectionHelper.connection(configuration)

connection.establish(function () {
connected = true
connecting = false
Expand All @@ -35,9 +36,20 @@ Persistence.redis = function (value) {
if (!connection.client || connection.subscriber.status !== 'ready') {
logging.error('Client: Not connected to redis')
}

return connection.client
}

Persistence.redisReplica = function (value) {
if (value) {
connection.replicaClient = value
}
if (!connection.replicaClient) {
logging.error('Replica Client: Not connected to redis')
}
return connection.replicaClient
}

Persistence.pubsub = function (value) {
if (value) {
connection.subscriber = value
Expand Down Expand Up @@ -65,13 +77,23 @@ Persistence.applyPolicy = function (multi, key, policy) {

Persistence.readOrderedWithScores = function (key, policy, callback) {
const multi = Persistence.redis().multi()
let multiReplica = null

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
multiReplica = Persistence.redisReplica().multi()
}

let replyCount = 0
switch (arguments.length) {
case 3:
if (policy) {
if (policy.maxCount) replyCount++
if (policy.maxAgeSeconds) replyCount++
Persistence.applyPolicy(multi, key, policy)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence.applyPolicy(multiReplica, key, policy)
}
}
break
case 2:
Expand All @@ -85,6 +107,15 @@ Persistence.readOrderedWithScores = function (key, policy, callback) {
if (err) throw new Error(err)
callback(replies[replyCount][1])
})

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
multiReplica.zrange(key, -100, -1, 'WITHSCORES')

multiReplica.exec(function (err, replies) {
if (err) throw new Error(err)
callback(replies[replyCount][1])
})
}
}

Persistence.persistOrdered = function (key, value, callback) {
Expand All @@ -96,6 +127,12 @@ Persistence.persistOrdered = function (key, value, callback) {
}, (err) => {
if (callback) callback(err)
})

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence
.redisReplica()
.zadd(key, Date.now(), JSON.stringify(value))
}
}

Persistence.delWildCard = function (expr, callback) {
Expand All @@ -119,6 +156,10 @@ Persistence.delWildCard = function (expr, callback) {
Persistence.del = function (key, callback) {
logging.info('deleting', key)
Persistence.redis().del(key, callback)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence.redisReplica().del(key, callback)
}
}

// Return the value associated with key that is stored in the hash *hash*
Expand Down Expand Up @@ -181,25 +222,53 @@ Persistence.persistKey = function (key, value, expireTTL) {
}

multi.exec()

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
const multiReplica = Persistence.redisReplica().multi()
logging.debug('persistKey replica:', key, value)
multiReplica.set(key, JSON.stringify(value))
if (expireTTL) {
multiReplica.expire(key, expireTTL)
}

multiReplica.exec()
}
}

Persistence.persistHash = function (hash, key, value) {
logging.debug('persistHash:', hash, key, value)
Persistence.redis().hset(hash, key, JSON.stringify(value), Persistence.handler)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
logging.debug('persistHash replica:', hash, key, value)
Persistence.redisReplica().hset(hash, key, JSON.stringify(value), Persistence.handler)
}
}

Persistence.expire = function (key, seconds) {
logging.debug('expire', key, seconds)
Persistence.redis().expire(key, seconds, Persistence.handler)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence.redisReplica().expire(key, seconds, Persistence.handler)
}
}

Persistence.ttl = function (key, callback) {
Persistence.redis().ttl(key, callback)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence.redisReplica().ttl(key, callback)
}
}

Persistence.deleteHash = function (hash, key) {
logging.debug('deleteHash:', hash, key)
Persistence.redis().hdel(hash, key, Persistence.handler)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence.redisReplica().hdel(hash, key, Persistence.handler)
}
}

Persistence.publish = function (key, value, callback) {
Expand Down Expand Up @@ -233,6 +302,10 @@ Persistence.handler = function (err) {

Persistence.incrby = function (key, incr) {
Persistence.redis().incrby(key, incr, Persistence.handler)

if (process.env.RADAR_MIGRATION_ENABLED === 'true') {
Persistence.redisReplica().incrby(key, incr, Persistence.handler)
}
}

Persistence.select = function (index) {
Expand Down
Loading

0 comments on commit 0973b39

Please sign in to comment.