Memory Dump Screenshot
When I run my springboot app for some time, I get java.lang.OutOfMemoryError: Java heap space
Seems that the size of org.eclipse.paho.mqttv5.client.internal.ClientState keep increasing.
Am I missing something?
** Note that I'm setting options.setCleanStart(false); and options.setSessionExpiryInterval(4294967295L);
so if the application instance dies then the new instance continue as if its the same client so it consumes the message that were not consumed before.
public void connect() {
try {
client = new MqttAsyncClient(brokerUrl, "my-client");
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password.getBytes());
options.setAutomaticReconnect(true);
options.setCleanStart(false); // Persistent session
options.setSessionExpiryInterval(4294967295L); // Max session expiry
client.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
executor.submit(() -> {
try {
String payload = new String(message.getPayload());
routeMessage(topic, payload);
} catch (Exception e) {
log.error("❌ Error processing MQTT message from topic {}: {}",
topic, e.getMessage(), e);
}
});
}
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.warn("🔌 Disconnected from MQTT broker: {}",
disconnectResponse.getReasonString());
}
@Override
public void mqttErrorOccurred(MqttException exception) {
log.error("❌ MQTT error occurred", exception);
}
@Override
public void deliveryComplete(IMqttToken token) {
// Not used for subscriptions
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("🔄 MQTT connection complete to {}", serverURI);
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
// Optional, unused
}
});
client.connect(options).waitForCompletion();
log.info("✅ Connected to MQTT broker at {}", brokerUrl);
for (String topic : topics) {
client.subscribe(topic, 2).waitForCompletion();
log.info("📡 Subscribed to topic: {}", topic);
}
} catch (MqttException e) {
log.error("❌ Failed to connect or subscribe to MQTT broker", e);
}
}
