2

I am working on mqtt clients in thingstream. When I try to publish message to a topic using qos = 0, the message is sent but program hangs there only. Then I tried sending message using qos = 1. Message was sent successfully and program also didn't hanged. But while sending another message, I get interruption exception. Can anyone help me out.

Below is the snippet I am using to publish message.

MqttMessage message = new MqttMessage(command);
message.setPayload(command);
message.setQos(1);
mqttClient.publish(topic, message);
System.out.println("Message published");

Library used - org.eclipse.paho.client.mqttv3-1.2.0

Code for initializing mqtt client

public void connect() {

    try {
        mqttClient = new MqttClient(serverUri, clientId);
        //mqttClient.setTimeToWait(10000);
    } catch (MqttException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    mqttClient.setCallback(new MqttCallbackExtended() {
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("messageArrived: " + topic.toString());
            System.out.println(mqttMessage.toString());
            System.out.println(mqttMessage.getPayload());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            // TODO Auto-generated method stub
            System.out.println("deliveryComplete: " + arg0.getMessageId());
        }

        @Override
        public void connectionLost(Throwable arg0) {
            // TODO Auto-generated method stub
            System.out.println("---Connection lost1");
            // Toast.makeText(App.getContext(), "Connection
            // lost",Toast.LENGTH_SHORT).show();

        }

        @Override
        public void connectComplete(boolean arg0, String arg1) {
            // TODO Auto-generated method stub
            System.out.println("connectComplete");
            try {
                mqttClient.subscribe("device/+/publish");
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            try {
                commands.sendCommand(Commands.GET_STATUS, null, "device/identity:85111741-5789-3010-85c9-be4a7204e5d3");
            } catch (MqttException | InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    });

    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setCleanSession(true);
    mqttConnectOptions.setUserName(username);
    mqttConnectOptions.setPassword(password.toCharArray());
    mqttConnectOptions.setKeepAliveInterval(30);
    mqttConnectOptions.setConnectionTimeout(60);
    mqttConnectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

    try {
        mqttClient.connect(mqttConnectOptions);
    } catch (MqttSecurityException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (MqttException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

4 Answers 4

3

Do not send it in the main thread, use another thread instead to publish the message.

Thread thread = new Thread() {
   public void run() {
       mqttClient.publish(topic, message);
   }  
};
thread.start(); 

Regarding the comment about having a bunch of threads that don't finish, this does not appear to be the case. When running in its own thread, the publish call completes and the thread just created ends.

It appears that calling publish in the main thread causes a thread lock. This would need to be looked further into as to whether this is a bug in the library or as designed, but calling publish in its own thread prevents wait lock.

Sign up to request clarification or add additional context in comments.

2 Comments

This does work. But why does the main thread get blocked by publishes with QoS 0? Also I’m guessing one wouldn’t want to spin up a new thread for each publish since this would cause a bunch of threads that each run once and then just hang around? Guessing what we should really do is have the thread consume from a queue and have the main thread enqueue to that queue?
This is an incorrect solution and eventually leads to other problems, see my answer.
0

The problem might not actually be the qos value but rather whether your client has a good/steady connection to the MQTT server. As mentioned here publish

... is a blocking method that returns once publish completes

Publish might not be completing. Check whether you have a connection to your MQTT server and whether your MQTT client has successfully connected to the MQTT server.

You may also surround the call with a try... catch ... statement to see whether there's an exception being thrown:

try {
  mqttClient.publish(topic, message);
} catch(Exception e) {
  e.printStackTrace();
}

2 Comments

I have tried with that also. Nothing shows in exception
You may need to post more of your code. I suspect a problem with how you are making the connection to server, sending and/or receiving data, not with MQTT ... Perhaps something is closing/holding the connection that you have with the MQTT server.
0

This is a bug in the library (org.eclipse.paho.client.mqttv3 prior to 1.2.5) when sending with QoS 0.

The solution of starting a thread is incorrect - while it makes it look like it solves the problem, you will end up with a number of threads that won't finish. Eventually, the library will complain that there are too many "publish in-flight" and you won't be able to publish anymore messages.

The issue Sync MqttClient receiver thread deadlock seems to be the one which when fixed, handled this problem.

Once upgraded to 1.2.5 (perhaps an earlier version after 1.2.0 solves it), this deadlock does not occur when sending with QoS 0.

Comments

0

I've encountered this issue also with Version greater than 1.2.5

It seems, that this deadlock is only appearing, when you publish from within the current "Receive" method, i.e.

client.setCallback(new MqttCallback() {
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
            //Using client to publish here = deadlock.
            client.publish(topic, message);


    }
});

Since you usually are invoking a bunch of nested listeners from within "messageArrived", it may not be very obvious at first view, that you are publishing something inside messageArrived.

Here, it helps to publish on a different thread. As then no deadlock appears, there will be no left over thread, it will simply execute and come to an end.

Probably kind of a logical thing, the publish is waiting for a lock on a ressource, while the ressource is currently held by the callback / receiving thread, waiting for the callback to finish.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.