Skip to content

Commit

Permalink
postgres option to return columns as array
Browse files Browse the repository at this point in the history
  • Loading branch information
peterprib committed Sep 2, 2020
1 parent 39e82c5 commit c7e344d
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 270 deletions.
1 change: 1 addition & 0 deletions .github/workflows/npm-publish.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# This workflow will run tests using node and then publish a package to GitHub Packages when a release is created
# For more information see: https://help.github.com/actions/language-and-framework-guides/publishing-nodejs-packages
# update to extrenalise

name: Node.js Package

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ _Note_ The examples will require the drivers to be installed

# Version

0.0.7 Use new feature in postgresql to return columns as array

0.0.6 fix bug with postgresql when parameters > 9.

0.0.4 fix bug on statement.
Expand Down
10 changes: 9 additions & 1 deletion connectionManager/connection-manager.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@
<label for="node-config-input-poolsize"><i class="icon-bookmark"></i> Pool Size</label>
<input type="number" id="node-config-input-poolsize" min="1' max ="200">
</div>

<div class="form-row form-row-http-in-columnsAsArray show">
<input type="checkbox" id="node-config-input-columnsAsArray" style="display: inline-block; width: auto; vertical-align: top;">
<label for="node-config-input-columnsAsArray" style="width: auto">Columns as Array</label>
</div>
</script>

<script type="text/javascript">
Expand All @@ -57,6 +60,7 @@
icon: "iconfinder_network_1287518.png",
defaults: {
name: {required:true},
columnsAsArray:{value:true},
driver: {required:true},
host: {value:"localhost" ,required:true},
port: {required:false},
Expand All @@ -77,6 +81,10 @@
// this.credentials.username is set to the appropriate value
// this.credentials.password is not set
// this.credentials.has_password indicates if the property is present in the runtime
$("#node-config-input-driver").change(function() {
const driver=$(this).val();
$(".form-row-http-in-columnsAsArray")[["pg"].includes(driver)?"show":"hide"]();
}).change();
}
});
</script>
113 changes: 70 additions & 43 deletions connectionManager/connection-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ function ConnectionPool(node) {
}
ConnectionPool.prototype.beginTransaction=function(c,done,error) {
if(logger.active) logger.send("ConnectionPool beginTransaction");
this.driver.beginTransaction(this.pool[c.id],done,error);
this.driver.beginTransaction(this,this.pool[c.id],done,error);
}
ConnectionPool.prototype.checkDeadConnection=function(c,errorMessage) {
if(logger.active) logger.send({label:"ConnectionPool.checkDeadConnection"});
Expand All @@ -215,7 +215,7 @@ ConnectionPool.prototype.close=function(c,done,error) {
]);
this.returnConnection(c.id);
} else{
this.driver.close(c,done,error);
this.driver.close(this,c,done,error);
}
};
ConnectionPool.prototype.closeAll=function(done) {
Expand All @@ -231,7 +231,7 @@ ConnectionPool.prototype.closeAll=function(done) {
}
ConnectionPool.prototype.commit=function(c,done,error) {
const pool=this;
this.driver.commit(this.pool[c.id],done,(err)=>{
this.driver.commit(pool,this.pool[c.id],done,(err)=>{
if(logger.active) logger.send("ConnectionPool commit "+err);
pool.checkDeadConnection(c,err);
error(err)
Expand All @@ -247,12 +247,12 @@ ConnectionPool.prototype.error=function(err,callback) {
ConnectionPool.prototype.exec=function(c,done,error,id,params) {
if(logger.active) logger.send("ConnectionPool exec connection id: "+c.id+" prepared id: "+id+" params: "+JSON.stringify(params));
if(!this.preparable) {
this.query(c,done,error,this.prepared[c.id][id],params);
this.query(c,done,error,this.prepared[c.id][id],params,id);
return;
}
this.lastUsed[c.id]=new Date();
const pool=this;
this.driver.exec(this.prepared[c.id][id],params,done,(err)=>{
this.driver.exec(pool,this.prepared[c.id][id],params,done,(err)=>{
if(logger.active) logger.send("ConnectionPool exec "+err);
pool.checkDeadConnection(c,err);
error(err)
Expand All @@ -273,7 +273,7 @@ ConnectionPool.prototype.getConnection=function(done,error) {
connectionPool.error("maximum pool size "+this.size,error);
return;
}
this.driver.getConnection(this.node,
this.driver.getConnection(connectionPool,this.node,
function (connection) {
connectionPool.node.log("new connection "+connectionPool.node.name);
let c=connectionPool.pool.find((e)=>e==null);
Expand Down Expand Up @@ -318,7 +318,7 @@ ConnectionPool.prototype.prepare=function(c,done,error,sql,id) {
return;
}
const pool=this;
this.driver.prepare(this.pool[c.id],(this.driver.translateSQL?this.driver.translateSQL(sql):sql),
this.driver.prepare(pool,this.pool[c.id],(this.driver.translateSQL?this.driver.translateSQL(sql):sql),
(prepared)=>{
pool.prepared[c.id][id]=prepared;
if(logger.active) logger.send("ConnectionPool prepared calling done");
Expand All @@ -329,15 +329,15 @@ ConnectionPool.prototype.prepare=function(c,done,error,sql,id) {
error(err)
});
};
ConnectionPool.prototype.query=function(c,done,error,sql,params) {
if(logger.active) logger.send("ConnectionPool query connection id: "+c.id+" sql: "+sql+" parms: "+JSON.stringify(params));
ConnectionPool.prototype.query=function(c,done,error,sql,params,prepareid) {
if(logger.active) logger.send({label:"ConnectionPool query connection",id:c.id,sql:sql,parms:params,prepareid:prepareid});
this.lastUsed[c.id]=new Date();
const pool=this;
this.driver.query(this.pool[c.id],(this.driver.translateSQL?this.driver.translateSQL(sql):sql),params,done, (err)=>{
this.driver.query(pool,this.pool[c.id],(this.driver.translateSQL?this.driver.translateSQL(sql):sql),params,done, (err)=>{
if(logger.active) logger.send({label:"ConnectionPool query error",error:err});
pool.checkDeadConnection(c,err);
error(err);
});
},prepareid);
};
ConnectionPool.prototype.release=function(c,done) {
if(logger.active) logger.send("ConnectionPool.release "+c.id);
Expand All @@ -356,7 +356,7 @@ ConnectionPool.prototype.returnConnection=function(c) {
ConnectionPool.prototype.rollback=function(c,done,error) {
if(logger.active) logger.send("ConnectionPool.rollback ");
const pool=this;
this.driver.rollback(this.pool[c.id],done,(err)=>{
this.driver.rollback(pool,this.pool[c.id],done,(err)=>{
if(logger.active) logger.send("ConnectionPool rollback "+err);
pool.checkDeadConnection(c,err);
error(err)
Expand All @@ -370,6 +370,7 @@ ConnectionPool.prototype.releaseStaleConnections=function() {
if(this.lastUsed[connectionID] < staleTimestamp) {
this.node.error("Releasing long running connection with rollback "+connectionID);
this.driver.rollback.apply(this.driver,[
thisObject,
thisObject.pool[connectionID],
()=>thisObject.release.apply(thisObject,[{id:connectionID},()=>{thisObject.node.log("Released connection with rollback "+connectionID);}]),
(err)=>thisObject.release.apply(thisObject,[{id:connectionID},()=>{thisObject.node.warn("Releasing connection "+connectionID+" rollback failed: "+err);}])
Expand Down Expand Up @@ -448,6 +449,7 @@ module.exports = function(RED) {
};

function Driver(a) {
if(logger.active) logger.send({label:"New Drive ",argument:a});
if(!a.optionsMapping) {
this.optionsMapping ={
host : "host",
Expand All @@ -472,15 +474,15 @@ function Driver(a) {
a
);
}
Driver.prototype.beginTransactionNoAction=function(conn,done,error) {
Driver.prototype.beginTransactionNoAction=function(pool,conn,done,error) {
if(logger.active) logger.send("Driver.beginTransactionNoAction");
done();
}
Driver.prototype.beginTransactionSql=function(conn,done,error) {
Driver.prototype.beginTransactionSql=function(pool,conn,done,error) {
if(logger.active) logger.send("Driver.beginTransactionSql");
this.query(conn,"Start Transaction",null,done,error);
this.query(pool,conn,"Start Transaction",null,done,error);
};
Driver.prototype.close=function(conn,done,error) {
Driver.prototype.close=function(pool,conn,done,error) {
if(logger.active) logger.send("close");
conn.close().then(done,(err,result)=>{
if(error) {
Expand All @@ -490,13 +492,13 @@ Driver.prototype.close=function(conn,done,error) {
done([{sql:sql,error:err}]);
});
};
Driver.prototype.commitNoAction=function(conn,done,error) {
Driver.prototype.commitNoAction=function(pool,conn,done,error) {
if(logger.active) logger.send("Driver.commitNoAction");
done();
};
Driver.prototype.commitSql=function(conn,done,error) {
Driver.prototype.commitSql=function(pool,conn,done,error) {
if(logger.active) logger.send("Driver.commit");
this.query(conn,"commit",null,done,error);
this.query(pool,conn,"commit",null,done,error);
};
Driver.prototype.getOptions=function(node) {
if(logger.active) logger.send("Driver.getOptions "+JSON.stringify(this.optionsMapping));
Expand All @@ -520,20 +522,21 @@ Driver.prototype.getOptions=function(node) {
}
return this.options;
};
Driver.prototype.getConnectionC=function(node,done,error) {
Driver.prototype.getConnectionC=function(pool,node,done,error) {
try{
const options=this.getOptions(node);
if(logger.active) logger.send("getConnectionC "+JSON.stringify(Object.assign({},options,{password:"***masked"})));
if(logger.active) logger.send("getConnectionC options "+JSON.stringify(Object.assign({},options,{password:"***masked"})));
const thisObject=this;
const c=new (this.Driver())(options);
c.connect((err)=>{
if(err) {
if(logger.active) logger.send("getConnection error "+err);
if(logger.active) logger.send("getConnectionC error "+err);
error(err);
return;
}
if(logger.active) logger.send("getConnectionC OK ");
if(thisObject.testOnConnect) {
thisObject.query(c,thisObject.testOnConnect,null,()=>done(c),error);
thisObject.query(pool,c,thisObject.testOnConnect,null,()=>done(c),error);
} else {
done(c);
}
Expand All @@ -543,21 +546,21 @@ Driver.prototype.getConnectionC=function(node,done,error) {
error(e);
}
};
Driver.prototype.getConnectionO=function(node,done,error) {
Driver.prototype.getConnectionO=function(pool,node,done,error) {
try{
const options=this.getOptions(node);
if(logger.active) logger.send("getConnectionC "+JSON.stringify(Object.assign({},options,{password:"***masked"})));
if(logger.active) logger.send("getConnectionC options "+JSON.stringify(Object.assign({},options,{password:"***masked"})));
if(!this.driverInstance) this.driverInstance= new (this.Driver());
let thisObject=this,
connectString="DATABASE="+options.database+";HOSTNAME="+options.host+";UID="+options.user+";PWD="+options.password+";PORT="+options.port+";PROTOCOL=TCPIP";
this.driverInstance.open(connectString,(err,conn)=>{
if(err) {
if(logger.active) logger.send("getConnection error "+err);
if(logger.active) logger.send("getConnectionO error "+err);
error(err);
return;
}
if(thisObject.testOnConnect) {
thisObject.query(conn,thisObject.testOnConnect,null,()=>done(conn),error);
thisObject.query(pool,conn,thisObject.testOnConnect,null,()=>done(conn),error);
} else {
done(conn);
}
Expand All @@ -567,16 +570,16 @@ Driver.prototype.getConnectionO=function(node,done,error) {
error(e);
}
};
Driver.prototype.getConnectionNeo4j=function(node,done,error) {
Driver.prototype.getConnectionNeo4j=function(pool,node,done,error) {
try{
const options=this.getOptions(node);
if(logger.active) logger.send("getConnectionNeo4j "+JSON.stringify(Object.assign({},options)));
if(logger.active) logger.send("getConnectionNeo4j options "+JSON.stringify(Object.assign({},options)));
let neo4j=new this.Driver(),
driver=neo4j.driver("bolt://"+options.host+":"+options.host, neo4j.auth.basic(options.user,options.password));
if(!driver) throw Error("driver build failed");
var session=driver.session();
if(this.testOnConnect) {
this.query(session,this.testOnConnect,null,()=>done(session),error);
this.query(pool,session,this.testOnConnect,null,()=>done(session),error);
} else {
done(session);
}
Expand All @@ -585,16 +588,16 @@ Driver.prototype.getConnectionNeo4j=function(node,done,error) {
error(e);
}
};
Driver.prototype.getConnectionQ=function(node,done,error) {
Driver.prototype.getConnectionQ=function(pool,node,done,error) {
try{
const options=this.getOptions(node);
if(logger.active) logger.send("getConnectionQ "+JSON.stringify(Object.assign({},options,{password:"***masked"})));
if(logger.active) logger.send("getConnectionQ options "+JSON.stringify(Object.assign({},options,{password:"***masked"})));
let c = new this.Driver(options),
thisObject=this;
c.connect(options).then(
()=>{
if(thisObject.testOnConnect) {
thisObject.query(c,thisObject.testOnConnect,null,()=>done(c),error);
thisObject.query(pool,c,thisObject.testOnConnect,null,()=>done(c),error);
} else {
done(c);
}
Expand All @@ -609,7 +612,7 @@ Driver.prototype.getConnectionQ=function(node,done,error) {
error(e);
}
};
Driver.prototype.execQ=function(preparedSql,params,done,error) {
Driver.prototype.execQ=function(pool,preparedSql,params,done,error) {
if(logger.active) logger.send("Driver.execQ "+JSON.stringify({params:params}));
const thisObject=this;
try{
Expand All @@ -631,8 +634,8 @@ Driver.prototype.execQ=function(preparedSql,params,done,error) {
logger.sendError("Driver.execQ error: "+e);
error(e);
}
},
Driver.prototype.prepareQ=function(conn,sql,done,error) {
};
Driver.prototype.prepareQ=function(pool,conn,sql,done,error) {
if(logger.active) logger.send("Driver.prepareQ "+JSON.stringify({sql:sql}));
const thisObject=this;
try{
Expand All @@ -650,8 +653,8 @@ Driver.prototype.prepareQ=function(conn,sql,done,error) {
logger.sendError("Driver.prepareQ error: "+e);
error(e);
}
},
Driver.prototype.queryC=function(conn,sql,params,done,error) {
};
Driver.prototype.queryC=function(pool,conn,sql,params,done,error) {
if(logger.active) logger.send("Driver.queryC "+JSON.stringify({sql:sql,params:params}));
const thisObject=this;
try{
Expand All @@ -668,8 +671,30 @@ Driver.prototype.queryC=function(conn,sql,params,done,error) {
logger.sendError("Driver.queryC error: "+e);
error(e);
}
};

Driver.prototype.queryCPG=function(pool,conn,sql,params,done,error) {
const thisObject=this,
query={text:sql,values:(params||this.paramNull)};
if(pool.node.columnsAsArray) query.rowMode='array';
if(logger.active) logger.send({label:"Driver.queryCOG ",query:query});
try{
conn.query(query,(err, result) => {
if(err) {
if(logger.active) logger.send("Driver.queryC error: "+err);
error(err);
} else {
if(logger.active) logger.send("Driver.queryC first 100 chars results"+JSON.stringify(result||"<null>").substring(1,100));
done(result);
}
});
} catch(e) {
logger.sendError("Driver.queryC error: "+e);
error(e);
}
},
Driver.prototype.queryNeo4j=function(session,cmd,params,done,error) {

Driver.prototype.queryNeo4j=function(pool,session,cmd,params,done,error) {
if(logger.active) logger.send("Driver.queryNeo4j "+JSON.stringify({cmd:cmd,params:params}));
try{
session.run(cmd,(params||this.paramNull)).then(done).catch(error);
Expand All @@ -678,7 +703,7 @@ Driver.prototype.queryNeo4j=function(session,cmd,params,done,error) {
error(e);
}
},
Driver.prototype.queryQ=function(conn,sql,params,done,error) {
Driver.prototype.queryQ=function(pool,conn,sql,params,done,error) {
if(logger.active) logger.send("Driver.queryQ "+JSON.stringify({sql:sql,params:params}));
const thisObject=this;
try{
Expand All @@ -701,9 +726,9 @@ Driver.prototype.queryQ=function(conn,sql,params,done,error) {
error(e);
}
},
Driver.prototype.rollback=function(conn,done,error) {
Driver.prototype.rollback=function(pool,conn,done,error) {
if(logger.active) logger.send("Driver.rollback");
this.query(conn,"rollback",null,done,error);
this.query(pool,conn,"rollback",null,done,error);
};
Driver.prototype.translateSQL=function(sql) {
return sql;
Expand Down Expand Up @@ -753,10 +778,12 @@ let DriverType = {
}),
'pg': new Driver({
Driver: function() {
return require('pg').Client;
return require(this.requireName).Client;
},
requireName:'pg',
autoCommit:true,
prepareIsQuery:true,
query:Driver.prototype.queryCPG,
translateSQL:function(sql) {
return sql.split('?').reduce((a,c,i)=>a+="$"+i+c);
}
Expand Down
Loading

0 comments on commit c7e344d

Please sign in to comment.