-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using SYNCPOINT with the ibmmq v2 NodeJS client #169
Comments
It's taken a little while to reproduce this, partly because the behaviour seems to be different depending on whether you're connecting as a client or using local bindings (which is my default config). I wouldn't have expected it to work differently, and I think the local bindings may be "wrong", but that's something else to deal with later. Anyway, the fundamental reason for getting the MQRC_HCONN_ASYNC_ACTIVE is that the NodeJS callback function is not running on the same thread as the real C-driven callback function. So you can't call MQI functions directly from that NodeJS callback. But I was able to get a program to work with syncpoint/commit calls. For clarity, in the callback function I call
Note that it's using MQOP SUSPEND/RESUME, not STOP/START. |
@ibmmqmet Thanks for posting a solution! I'll try this later today. Just looking at the code though, I have a few questions: Also, could we run into cascading failures of I think what I am asking is, why are we not nesting the subsequent mq calls within the previous call's callback function? One possible reason, I suspect, is this might put us in a different underlying C thread(s) between the various calls to |
Ctl and Cmit (and Back) are effectively synchronous calls to the C libraries so nesting in callback functions is not really necessary. The only true async calls in this package are the ones where there's also a *Sync equivalent function (eg Open and OpenSync). I did think about making Cmit/Back asynchronous but decided the sync approach is much more reliable here - or at least, more likely to get any errors back to the application. Which is something that's key when doing transactional work The MQI C libraries have a number of constraints around threads, primarily to ensure you don't try to have 2 operations going on simultaneously for the same hConn. That was relaxed slightly when the callback capability was implemented, but it still imposes restrictions which don't play very nicely with the Node async/thread model. Which is why the application-level callback here is on a different (the main Node execution) thread than the MQI called. |
Yes, that's what I discovered in my experiments yesterday. But this leaves me vulnerable to getting blocked for long periods (when making calls to Perhaps these "synchronous" calls could be made truly async where the call to Also, if they are effectively synchronous, would it make sense to have an alternate API for these calls, where they simply return the error rather than taking a callback? /** returns err if there is any failure, else returns undefined **/
function cmitFromCB() {
let err = mq.Ctl(connectionHandle, MQC.MQOP_SUSPEND);
if (err) {
console.log("Error on SUSPEND", err);
return err;
}
err = mq.Cmit(connectionHandle);
if (err) {
console.log("Error on commit", err);
return err;
} else {
console.log("Commit was OK");
}
err = mq.Ctl(connectionHandle, MQC.MQOP_RESUME);
if (err) {
console.log("Error on RESUME", err);
return err;
}
} I guess I can create this cleaner API in my own code. function myCmit(connecrtionHandle) {
let error;
mq.Cmit(connectionHandle, function (err) {
error = err;
});
return error;
}
let err = myCmit(connectionHandle); |
I really don't like the idea of making CMIT asynchronous as you might otherwise "start" a new transaction before the previous one has been committed and it ends up being part of the older transcion when the commit finally happens. The sequencing MIGHT turn out to be OK, but this way is much more definitive. I don't want to be subject to the vagaries of the node internal thread scheduling. We also do not have any direct access to the MQI thread that's doing the real callback - it's under the control of the MQI C library. So can't add work for it to process. If you don't want to use a callback on the sync calls like Cmit, then they already throw exceptions on error rather than giving a return code when no callback function is provided. |
I don't see a way to avoid async. For example, in the getMessagesCB = async (error, hObj, gmo, md, buf, hConn) {
const order = JSON.parse(buf.toString('utf8'))
// while I await the write to DB to finish, my `getMessagsesCB` function returns a promise
// since the function returns, the MQ client calls `getMessagsesCB` again immediately with the next message waiting in the queue
await this.ordersRepo.insert({
orderId: order.Id,
orderDate: (new Date()).toISOString(),
orderDetails: order.Details,
})
// so later, when `this.cmitFromCB` is called, it is possible that two messages get committed in MQ, whereas only one of them was successfully written to the DB at that time
this.cmitFromCB()
} So, using this approach, I could not avoid the issue you mentioned ("starting" a new transaction before the previous one is committed). To avoid this, I had to modify the e.g. getMessagesCB = async (error, hObj, gmo, md, buf, hConn) {
// de-register the callback, so MQ client does not send more messages
mq.GetDone(this.queueHandle, function (err) {
logInfo({ msg: `GetDone remove message listener. Err=${err}`, error: err })
})
const order = JSON.parse(buf.toString('utf8'))
// now even though getMessagesCB returns on the next line (with a promise),
// MQ client will not call getMessagesCB again with a new message
await this.ordersRepo.insert({
orderId: order.Id,
orderDate: (new Date()).toISOString(),
orderDetails: order.Details,
})
this.cmitFromCB()
}
// update this.cmitFromCB to re-register getMessagsesCB
cmitFromCB() {
mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND, function (err) {
console.log('Error in Ctl MQOP_SUSPEND', err)
})
mq.Cmit(this.hConn, function (err) {
console.log('Error in Cmit', err)
})
// re-register listener for messages
mq.Get(this.queueHandle, this.md, this.gmo, this.getMessagesCB)
mq.Ctl(this.hConn, mq.MQC.MQOP_RESUME, function (err) {
console.log('Error in Ctl MQOP_RESUME', err)
})
} Perhaps there is a more elegant way to tell the MQ client not to send the next message till the previous message is either committed ( And since all these And Mark, thank you so much for patiently engaging and explaining the library. It has helped clear up many of my doubts and I think I am much closer now to a robust solution that is both "safe" and non-blocking. |
I do have a better solution to forcing you to do the SUSPEND/RESUME - when I went back to the code, I realised that I had already coded it. And it worked on the machine/container that I had originally tested it on. But there was a timing assumption that broke it on every other machine I tried more recently. An easy fix to that will go into the next release, which will be around the time of the next MQ CD update. With stuff added to the README to clarify. Basically, I do the SUSPEND/RESUME at suitable times around the callback to the message listener. It still means the MQCTL calls are being made, but everything inside the callback is "reliable" including CMIT only processing the messages you expect it to. But those calls are still going to be synchronous. |
I tried to use SUSPEND/RESUME earlier in the process but I was still getting the 2nd message before I was done with the first. Here's my code: // register the new message callback in the class `init` function
init() {
try {
const cno = new mq.MQCNO();
cno.Options = mq.MQC.MQCNO_CLIENT_BINDING; // connecting as client
const cd = new mq.MQCD();
cd.ConnectionName = mqConfig.connectionName;
cd.ChannelName = mqConfig.channelName;
cno.ClientConn = cd;
this.hConn = await mq.ConnxPromise(qMgr, cno);
const od = new mq.MQOD();
od.ObjectName = qName;
od.ObjectType = mq.MQC.MQOT_Q;
let openOptions = mq.MQC.MQOO_INPUT_SHARED | mq.MQC.MQGMO_SYNCPOINT;
this.queueHandle = await mq.OpenPromise(this.hConn, od, openOptions);
let md = new mq.MQMD(); // message descriptor?
const gmo = new mq.MQGMO(); // get message options
const MQC = mq.MQC;
gmo.Options = MQC.MQGMO_SYNCPOINT | MQC.MQGMO_WAIT | MQC.MQGMO_CONVERT | MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.WaitInterval = MQC.MQWI_UNLIMITED; // wait forever
gmo.MatchOptions = MQC.MQMO_NONE;
this.md = md;
this.gmo = gmo;
mq.Get(this.queueHandle, md, gmo, this.getMessagesCB);
} catch(error) {
console.error('Error in registering MQ callback', error);
}
}
// getMessagesCB is a class method that was set as the callback in a call to `mq.Get()`
getMessagesCB = async (error, hObj, gmo, md, buf, hConn) => {
if (error) {
console.error('error in getMessages callback', error);
return;
}
try {
mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND);
// do my async thing with this message
await writeMessageToDB(buf.toString('utf8'));
// now I need to call suspend again since the await keyword used above puts me in a continuation, else I get a MQRC_HCONN_ASYNC_ACTIVE error
// so, calling suspend, Cmit and resume in the same synchronous region of code
mq.Ctl(this.hConn, mq.MQC.MQOP_SUSPEND);
mq.Cmit(this.hConn);
mq.Ctl(this.hConn, mq.MQC.MQOP_RESUME);
} catch(error) {
console.error('error saving message to DB and committing message in MQ', error);
}
} To avoid getting the 2nd message before the first message was committed, I had to change from SUSPEND/RESUME to STOP/START. However, I still need to run this entire code using node worker_threads to avoid blocking non-MQ related code in my application. Really, there should be a way to tell the C library that after the first message is received (by the C-library), before the C-library calls the JS callback, it should call mqctl SUSPEND (or STOP). Similarly, when we call Cmit, there should be a way to tell the C-library to do all three things (stop, Cmit, start) in the same call. This way, the JS API calls into the MQ library need not be synchronous. Synchronous network calls in an event loop driven system such as NodeJS which depends on co-operative multi-tasking does not seem like the correct pattern. |
- Support for JWT Token authentication * Change the postinstall to use genmqpkg on Linux * Can configure postinstall to download client from local URL * Updated postinstall to support http proxy environment variables (#167) * Fix calling the MQI within message delivery callback (#169) * Update documentation links
mq-mqi-nodejs version - 2.0.1
I am trying to use this package to get messages using
MQGMO_SYNCPOINT
flag in thegmo
(get message options) object.The
MQGMO_SYNCPOINT
option ensures that the message is not removed from the queue until I later commit the message usingmq.Cmit
call.However, when I make that
Cmit
call in my callback function (this.getMessagesCB
) e.g.I get the following error:
With this new version of 'ibmmq' NodeJS package, we have to call the
Ctl
method with one of the start options. I am usingMQOP_START
as shown in the samples included in the v2 version. e.g.Reading the control callbacks documentation says:
So, I tried, issuing a new
Ctl
call withMQOP_STOP
, before callingmq.Cmit
e.g.Now I get a different error:
I suspect, this is because I have no way of accessing the correct thread from within NodeJS, to issue the
Cmit
command.Could you please provide a working example of using
MQGMO_SYNCPOINT
to get a message non-destructively from a queue and then later commit the message usingCmit
?The text was updated successfully, but these errors were encountered: