Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/robust-healt-check #5

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
221 changes: 121 additions & 100 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,158 +3,171 @@ import request from 'request';
import services from '/config/rules.js';
import bodyParser from 'body-parser';
import dns from 'dns';
import metrics from './lib/metrics';

// Also parse application/json as json
app.use( bodyParser.json( {
app.use(bodyParser.json({
type: function(req) {
return /^application\/json/.test( req.get('content-type') );
return /^application\/json/.test(req.get('content-type'));
},
limit: '500mb'
} ) );
}));

// Log server config if requested
if( process.env["LOG_SERVER_CONFIGURATION"] )
console.log(JSON.stringify( services ));
if (process.env['LOG_SERVER_CONFIGURATION'])
console.log(JSON.stringify(services));

app.get( '/', function( req, res ) {
app.get('/', function(req, res) {
res.status(200);
res.send("Hello, delta notification is running");
} );
res.send('Hello, delta notification is running');
});

app.post( '/', function( req, res ) {
if( process.env["LOG_REQUESTS"] ) {
console.log("Logging request body");
app.get('/metric', function(req, res) {
res.status(200).send(metrics.getReport());
});

app.post('/', function(req, res) {
if (process.env['LOG_REQUESTS']) {
console.log('Logging request body');
console.log(req.body);
}

const changeSets = req.body.changeSets;

const originalMuCallIdTrail = JSON.parse( req.get('mu-call-id-trail') || "[]" );
const originalMuCallIdTrail = JSON.parse(req.get('mu-call-id-trail') || '[]');
const originalMuCallId = req.get('mu-call-id');
const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] );
const muCallIdTrail = JSON.stringify([...originalMuCallIdTrail, originalMuCallId]);

changeSets.forEach( (change) => {
changeSets.forEach((change) => {
change.insert = change.insert || [];
change.delete = change.delete || [];
} );
});

// inform watchers
informWatchers( changeSets, res, muCallIdTrail );
informWatchers(changeSets, res, muCallIdTrail);

// push relevant data to interested actors
res.status(204).send();
} );
});

async function informWatchers( changeSets, res, muCallIdTrail ){
services.map( async (entry) => {
// for each entity
if( process.env["DEBUG_DELTA_MATCH"] )
console.log(`Checking if we want to send to ${entry.callback.url}`);
async function informWatchers(changeSets, res, muCallIdTrail) {

const matchSpec = entry.match;
services.map(async (entry) => {
try {
// for each entity
if (process.env['DEBUG_DELTA_MATCH'])
console.log(`Checking if we want to send to ${entry.callback.url}`);

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);
const matchSpec = entry.match;

let allInserts = [];
let allDeletes = [];
const originFilteredChangeSets = await filterMatchesForOrigin(changeSets, entry);
if (process.env['DEBUG_TRIPLE_MATCHES_SPEC'] && entry.options.ignoreFromSelf)
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry(entry)}`);

originFilteredChangeSets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );
let allInserts = [];
let allDeletes = [];

const changedTriples = [...allInserts, ...allDeletes];
originFilteredChangeSets.forEach((change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
});

const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );
const changedTriples = [...allInserts, ...allDeletes];

if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);
const someTripleMatchedSpec =
changedTriples.some((triple) => tripleMatchesSpec(triple, matchSpec));

if( someTripleMatchedSpec ) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);
if (process.env['DEBUG_TRIPLE_MATCHES_SPEC'])
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);

if( entry.options && entry.options.gracePeriod ) {
setTimeout(
() => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ),
entry.options.gracePeriod );
} else {
sendRequest( entry, originFilteredChangeSets, muCallIdTrail );
if (someTripleMatchedSpec) {
// inform matching entities
if (process.env['DEBUG_DELTA_SEND'])
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if (entry.options && entry.options.gracePeriod) {
setTimeout(
() => sendRequest(entry, originFilteredChangeSets, muCallIdTrail),
entry.options.gracePeriod);
} else {
sendRequest(entry, originFilteredChangeSets, muCallIdTrail);
}
}
} catch (error) {
metrics.addRequest({
url: entry.callback.url,
method: entry.callback.method,
error
});
}
} );
});
}

function tripleMatchesSpec( triple, matchSpec ) {
function tripleMatchesSpec(triple, matchSpec) {
// form of triple is {s, p, o}, same as matchSpec
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
if (process.env['DEBUG_TRIPLE_MATCHES_SPEC'])
console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`);

for( let key in matchSpec ){
for (let key in matchSpec) {
// key is one of s, p, o
const subMatchSpec = matchSpec[key];
const subMatchValue = triple[key];

if( subMatchSpec && !subMatchValue )
if (subMatchSpec && !subMatchValue)
return false;

for( let subKey in subMatchSpec )
// we're now matching something like {type: "url", value: "http..."}
if( subMatchSpec[subKey] !== subMatchValue[subKey] )
for (let subKey in subMatchSpec)
// we're now matching something like {type: "url", value: "http..."}
if (subMatchSpec[subKey] !== subMatchValue[subKey])
return false;
}
return true; // no false matches found, let's send a response
}


function formatChangesetBody( changeSets, options ) {
if( options.resourceFormat == "v0.0.1" ) {
function formatChangesetBody(changeSets, options) {
if (options.resourceFormat == 'v0.0.1') {
return JSON.stringify(
changeSets.map( (change) => {
return {
inserts: change.insert,
deletes: change.delete
};
} ) );
changeSets.map((change) => {
return {
inserts: change.insert,
deletes: change.delete
};
}));
}
if( options.resourceFormat == "v0.0.0-genesis" ) {
if (options.resourceFormat == 'v0.0.0-genesis') {
// [{delta: {inserts, deletes}]
const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" });
const newFormat = JSON.parse( formatChangesetBody( changeSets, newOptions ) );
const newOptions = Object.assign({}, options, {resourceFormat: 'v0.0.1'});
const newFormat = JSON.parse(formatChangesetBody(changeSets, newOptions));
return JSON.stringify({
// graph: Not available
delta: {
inserts: newFormat
.flatMap( ({inserts}) => inserts)
.map( ({subject,predicate,object}) =>
( { s: subject.value, p: predicate.value, o: object.value } ) ),
deletes: newFormat
.flatMap( ({deletes}) => deletes)
.map( ({subject,predicate,object}) =>
( { s: subject.value, p: predicate.value, o: object.value } ) )
inserts: newFormat.flatMap(({inserts}) => inserts).map(({subject, predicate, object}) =>
({s: subject.value, p: predicate.value, o: object.value})),
deletes: newFormat.flatMap(({deletes}) => deletes).map(({subject, predicate, object}) =>
({s: subject.value, p: predicate.value, o: object.value}))
}
});
} else {
throw `Unknown resource format ${options.resourceFormat}`;
}
}

async function sendRequest( entry, changeSets, muCallIdTrail ) {
async function sendRequest(entry, changeSets, muCallIdTrail) {
let requestObject; // will contain request information

// construct the requestObject
const method = entry.callback.method;
const url = entry.callback.url;
const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": changeSets[0].allowedGroups, "mu-call-id-trail": muCallIdTrail, "mu-call-id": uuid() };

if( entry.options && entry.options.resourceFormat ) {
const headers = {
'Content-Type': 'application/json',
'MU-AUTH-ALLOWED-GROUPS': changeSets[0].allowedGroups,
'mu-call-id-trail': muCallIdTrail,
'mu-call-id': uuid()
};

if (entry.options && entry.options.resourceFormat) {
// we should send contents
const body = formatChangesetBody( changeSets, entry.options );
const body = formatChangesetBody(changeSets, entry.options);

// TODO: we now assume the mu-auth-allowed-groups will be the same
// for each changeSet. that's a simplification and we should not
Expand All @@ -167,46 +180,54 @@ async function sendRequest( entry, changeSets, muCallIdTrail ) {
};
} else {
// we should only inform
requestObject = { url, method, headers };
requestObject = {url, method, headers};
}

if( process.env["DEBUG_DELTA_SEND"] )
if (process.env['DEBUG_DELTA_SEND'])
console.log(`Executing send ${method} to ${url}`);

request( requestObject, function( error, response, body ) {
if( error ) {
request(requestObject, function(error, response, body) {
if (error) {
console.log(`Could not send request ${method} ${url}`);
console.log(error);
console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send
metrics.addRequest({
url,
method,
error
});
}

if( response ) {
// console.log( body );
if (response) {
metrics.addRequest({
url,
method
});
}
});
}

async function filterMatchesForOrigin( changeSets, entry ) {
if( ! entry.options || !entry.options.ignoreFromSelf ) {
async function filterMatchesForOrigin(changeSets, entry) {
if (!entry.options || !entry.options.ignoreFromSelf) {
return changeSets;
} else {
const originIpAddress = await getServiceIp( entry );
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
const originIpAddress = await getServiceIp(entry);
return changeSets.filter((changeSet) => changeSet.origin != originIpAddress);
}
}

function hostnameForEntry( entry ) {
function hostnameForEntry(entry) {
return (new URL(entry.callback.url)).hostname;
}

async function getServiceIp(entry) {
const hostName = hostnameForEntry( entry );
return new Promise( (resolve, reject) => {
dns.lookup( hostName, { family: 4 }, ( err, address) => {
if( err )
reject( err );
const hostName = hostnameForEntry(entry);
return new Promise((resolve, reject) => {
dns.lookup(hostName, {family: 4}, (err, address) => {
if (err)
reject(err);
else
resolve( address );
} );
} );
resolve(address);
});
});
};
Loading