0

Memory Dump Screenshot

Memory Dump Screenshot

When I run my springboot app for some time, I get java.lang.OutOfMemoryError: Java heap space Seems that the size of org.eclipse.paho.mqttv5.client.internal.ClientState keep increasing.

Am I missing something?

** Note that I'm setting options.setCleanStart(false); and options.setSessionExpiryInterval(4294967295L);

so if the application instance dies then the new instance continue as if its the same client so it consumes the message that were not consumed before.

public void connect() {
  try {
    client = new MqttAsyncClient(brokerUrl, "my-client");

    MqttConnectionOptions options = new MqttConnectionOptions();
    options.setUserName(username);
    options.setPassword(password.getBytes());
    options.setAutomaticReconnect(true);
    options.setCleanStart(false); // Persistent session
    options.setSessionExpiryInterval(4294967295L); // Max session expiry

    client.setCallback(new MqttCallback() {
      @Override
      public void messageArrived(String topic, MqttMessage message) {
        executor.submit(() -> {
          try {
            String payload = new String(message.getPayload());
            routeMessage(topic, payload);
          } catch (Exception e) {
            log.error("❌ Error processing MQTT message from topic {}: {}",
                topic, e.getMessage(), e);
          }
        });
      }

      @Override
      public void disconnected(MqttDisconnectResponse disconnectResponse) {
        log.warn("🔌 Disconnected from MQTT broker: {}",
            disconnectResponse.getReasonString());
      }

      @Override
      public void mqttErrorOccurred(MqttException exception) {
        log.error("❌ MQTT error occurred", exception);
      }

      @Override
      public void deliveryComplete(IMqttToken token) {
        // Not used for subscriptions
      }

      @Override
      public void connectComplete(boolean reconnect, String serverURI) {
        log.info("🔄 MQTT connection complete to {}", serverURI);
      }

      @Override
      public void authPacketArrived(int reasonCode, MqttProperties properties) {
        // Optional, unused
      }
    });

    client.connect(options).waitForCompletion();

    log.info("✅ Connected to MQTT broker at {}", brokerUrl);

    for (String topic : topics) {
      client.subscribe(topic, 2).waitForCompletion();
      log.info("📡 Subscribed to topic: {}", topic);
    }

  } catch (MqttException e) {
    log.error("❌ Failed to connect or subscribe to MQTT broker", e);
  }
}

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.