My current application logic uses the depth of the a 'PROCESS' WMQ queue to determine if a job is being processed by an IIB9 workflow. If the workflow is processing a messages, the application wait till that workflow is over. Once the workflow is over, the 'PROCESS' queue will be emptied using GET operation and the application sends other messages in the sequence for processing. I am using JMS selectors to differentiate between multiple messages being processed parallelly by the workflow.
The issue is with determining the depth of the queue. JMS API is giving the depth as 0, while IBM API is giving the depth as 1(which is expected). Unfortunately, I cannot use IBM API as my logic using some complex messageselectors.
Has anyone seen this bizarre behaviour? Please note that the IIB9 workflow is in progress while the size check is being made. Is there a setting to be tweaked?
JMS Code (message selector removed for clarity):
public class QDepthJMS {
public static void main(String[] a) throws Exception {
MQConnectionFactory factory = new MQConnectionFactory();
factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
factory.setQueueManager("QM01");
factory.setHostName("10.10.98.15");
factory.setPort(1414);
factory.setChannel("Java.Clients");
MQConnection connection = (MQConnection) factory.createConnection();
connection.start();
MQSession session = (MQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MQQueue queue = (MQQueue) session.createQueue("queue:///PROCESS");
MQQueueBrowser browser = (MQQueueBrowser) session.createBrowser(queue);
Enumeration<Message> msgs = browser.getEnumeration();
int count =0;
if (msgs.hasMoreElements()) {
msgs.nextElement();
++count;
}
System.out.println(count);
}
}
IBM API (Check MQ queue depth):
public class QDepth {
private final String host;
private final int port;
private final String channel;
private final String manager;
private final MQQueueManager qmgr;
public QDepth(String host, int port, String channel, String manager) throws MQException {
this.host = host;
this.port = port;
this.channel = channel;
this.manager = manager;
this.qmgr = createQueueManager();
}
public int depthOf(String queueName) throws MQException {
MQQueue queue = qmgr.accessQueue(queueName, MQC.MQOO_INQUIRE | MQC.MQOO_INPUT_AS_Q_DEF, null, null, null);
return queue.getCurrentDepth();
}
@SuppressWarnings("unchecked")
private MQQueueManager createQueueManager() throws MQException {
MQEnvironment.channel = channel;
MQEnvironment.port = port;
MQEnvironment.hostname = host;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
return new MQQueueManager(manager);
}
public static void main(String[] a) throws Exception {
QDepth qd = new QDepth("10.10.98.15, 1414, "Java.Clients", "QM01");
System.out.println(qd.depthOf("PROCESS"));
}
}