7

I have a SpringBoot Application which makes use of Spring AMQP. At the moment I am implementing the receiver side of it using JavaConfig classes as directed by official Spring docs examples. However, I am receiving a MessageConversionException and the message is not even reaching my ClientHandler class to process the message. Here are the console logs for the incident:

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to      convert Message content
    at org.springframework.amqp.support.converter.JsonMessageConverter.fromMessage(JsonMessageConverter.java:105) ~[spring-amqp-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:185) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:243) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of byte out of START_OBJECT token
 at [Source: java.io.StringReader@33b39883; line: 1, column: 1]
    at org.codehaus.jackson.map.JsonMappingException.from(JsonMappingException.java:163) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.StdDeserializationContext.mappingException(StdDeserializationContext.java:219) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.std.StdDeserializer._parseByte(StdDeserializer.java:214) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.std.StdDeserializer$ByteDeserializer.deserialize(StdDeserializer.java:749) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.std.StdDeserializer$ByteDeserializer.deserialize(StdDeserializer.java:736) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1877) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.springframework.amqp.support.converter.JsonMessageConverter.convertBytesToObject(JsonMessageConverter.java:131) ~[spring-amqp-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.support.converter.JsonMessageConverter.fromMessage(JsonMessageConverter.java:100) ~[spring-amqp-1.4.6.RELEASE.jar:na]
    ... 13 common frames omitted
2016-10-25 12:51:42,020 WARN  t:[SimpleAsyncTaskExecutor-1] ConditionalRejectingErrorHandler: Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"messageDTOList":[{"createdDate":1477396301061,"message":"{\"mac\":-99,\"device\":444,\"ifindex\":55,\"ip\":668,\"arpdevice\":99,\"aprifindex\":\"\",\"vlanindex\":111,\"enddevice\":133}","crudOperation":"UPDATE"}],"senderServerId":"5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b","messageId":"5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b20","rebasoftServerType":"AC","taskType":"MAC"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[53, 98, 51, 54, 97, 55, 97, 49, 45, 99, 49, 55, 100, 45, 52, 55, 49, 102, 45, 98, 49, 54, 101, 45, 101, 56, 101, 48, 98, 102, 52, 97, 102, 98, 53, 98, 50, 48], replyTo=null, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=X, receivedRoutingKey=data.core.macs, deliveryTag=2, messageCount=0])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:759) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83) [spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1253) [spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021) [spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005) [spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83) [spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1115) [spring-rabbit-1.4.6.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    at org.springframework.amqp.support.converter.JsonMessageConverter.fromMessage(JsonMessageConverter.java:105) ~[spring-amqp-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:185) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:243) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) ~[spring-rabbit-1.4.6.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of byte out of START_OBJECT token
 at [Source: java.io.StringReader@33b39883; line: 1, column: 1]
    at org.codehaus.jackson.map.JsonMappingException.from(JsonMappingException.java:163) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.StdDeserializationContext.mappingException(StdDeserializationContext.java:219) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.std.StdDeserializer._parseByte(StdDeserializer.java:214) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.std.StdDeserializer$ByteDeserializer.deserialize(StdDeserializer.java:749) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.deser.std.StdDeserializer$ByteDeserializer.deserialize(StdDeserializer.java:736) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2732) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1877) ~[jackson-mapper-asl-1.9.13.jar:1.9.13]
    at org.springframework.amqp.support.converter.JsonMessageConverter.convertBytesToObject(JsonMessageConverter.java:131) ~[spring-amqp-1.4.6.RELEASE.jar:na]
    at org.springframework.amqp.support.converter.JsonMessageConverter.fromMessage(JsonMessageConverter.java:100) ~[spring-amqp-1.4.6.RELEASE.jar:na]
    ... 13 common frames omitted

My client classes are below:

AbstractEMCRabbitConfiguration.java

@Configuration
public abstract class AbstractEMCRabbitConfiguration {

//@Value("${emc.rabbit.exchange.core}")
protected static String CORE_DATA_EXCHANGE_NAME = "X";

@Value("${emc.rabbit.queue.core.macs}")
protected static String MAC_REQUEST_QUEUE_NAME = "data.core.macs";

protected static String MAC_REQUEST_ROUTING_KEY = MAC_REQUEST_QUEUE_NAME;

//@Value("${emc.rabbit.hostname}")
protected String hostname;

//@Value("${emc.rabbit.username}")
protected String username;

//@Value("${emc.rabbit.password}")
protected String password;

//@Value("${emc.rabbit.port:5672}")
protected Integer port = 5672;

protected abstract void configureRabbitTemplate(RabbitTemplate template);

@Bean
public ConnectionFactory connectionFactory(){
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("test");
    connectionFactory.setPassword("test");
    connectionFactory.setPort(port);
    connectionFactory.setRequestedHeartBeat(60);
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate(){
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setMessageConverter(jsonMessageConverter());
    configureRabbitTemplate(template);
    return template;
}

@Bean
public MessageConverter jsonMessageConverter() {
    JsonMessageConverter jsonMessageConverter = new JsonMessageConverter();
    jsonMessageConverter.setClassMapper(defaultClassMapper());
    return jsonMessageConverter;
}

@Bean
public DefaultClassMapper defaultClassMapper() {
    DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
    defaultClassMapper.setDefaultType(byte.class);
    return defaultClassMapper;
}

@Bean
public DirectExchange coreDataExchange() { return new DirectExchange(CORE_DATA_EXCHANGE_NAME); }

@Bean
public AmqpAdmin amqpAdmin() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
    return rabbitAdmin;
}
}

RabbitClientConfiguration.java

@Configuration
@Import({JacksonConfiguration.class, MessagingConfiguration.class})
public class RabbitClientConfiguration extends AbstractEMCRabbitConfiguration {

@Inject
private JacksonConfiguration jacksonConfiguration;

@Inject
private MessagingConfiguration messagingConfiguration;

//@Value("${data.core.pattern}")
//private String coreDataRoutingKey;

//@Inject
//private ClientHandler clientHandler;

@Override
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setRoutingKey("data.core.macs");
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueues(macsDataQueue());
    container.setMessageListener(messageListenerAdapter());
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    return container;
}

@Bean
MessageListenerAdapter messageListenerAdapter(){
    return new MessageListenerAdapter(clientHandler(jacksonConfiguration.objectMapper(),messagingConfiguration.macMessageHandler(messagingConfiguration.messagingFactory())),jsonMessageConverter());
    //return new MessageListenerAdapter(clientHandler,"receiveMessage");
}

@Bean
ClientHandler clientHandler(ObjectMapper objectMapper, IMessageHandler macMessageHandler){
    ClientHandler clientHandler = new ClientHandler();
    clientHandler.setObjectMapper(objectMapper);
    clientHandler.setMacMessageHandler(macMessageHandler);
    return clientHandler;
}

@Bean
public Queue macsDataQueue(){
    return rabbitAdmin().declareQueue();
}

@Bean
public Binding macsDataBinding(){
    return BindingBuilder.bind(macsDataQueue()).to(coreDataExchange()).with("data.core.macs");
}

@Bean
public AmqpAdmin rabbitAdmin() { return new RabbitAdmin(connectionFactory());}

}

A typical message received from the broker would look like this:

(Body:'{"messageDTOList":[{"createdDate":1477396301061,"message":"{\"mac\":-99,\"device\":444,\"ifindex\":55,\"ip\":668,\"arpdevice\":99,\"aprifindex\":\"\",\"vlanindex\":111,\"enddevice\":133}","crudOperation":"UPDATE"}],"senderServerId":"5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b","messageId":"5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b20","rebasoftServerType":"AC","taskType":"MAC"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[53, 98, 51, 54, 97, 55, 97, 49, 45, 99, 49, 55, 100, 45, 52, 55, 49, 102, 45, 98, 49, 54, 101, 45, 101, 56, 101, 48, 98, 102, 52, 97, 102, 98, 53, 98, 50, 48], replyTo=null, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=X, receivedRoutingKey=data.core.macs, deliveryTag=2, messageCount=0])

Note: The content type is 'application/json'

Any help would me much appreciated.

5 Answers 5

1

Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of byte out of START_OBJECT token

Looks like a garbage message - are you sure it was created by the rabbit template with the json converter?

Can you edit the question and post the message content (and the content_type property, and other headers)? (You can use the Admin UI to examine the message).

EDIT

I had no problems decoding your message with this app...

@SpringBootApplication
public class So40240771Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40240771Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        String body = "{"
                + "\"messageDTOList\":"
                + "[{\"createdDate\":1477396301061,"
                +   "\"message\":\"{\\\"mac\\\":-99,\\\"device\\\":444,\\\"ifindex\\\":55,\\\"ip\\\":668,\\\"arpdevice\\\":99,\\\"aprifindex\\\":\\\"\\\",\\\"vlanindex\\\":111,\\\"enddevice\\\":133}\","
                +   "\"crudOperation\":\"UPDATE\"}],"
                + "\"senderServerId\":\"5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b\","
                + "\"messageId\":\"5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b20\","
                + "\"rebasoftServerType\":\"AC\","
                + "\"taskType\":\"MAC\""
                + "}";
        MessageProperties properties = new MessageProperties();
        properties.setContentType("application/json");
        template.send("foo", MessageBuilder.withBody(body.getBytes()).andProperties(properties).build());
        Thread.sleep(6000);
        context.close();
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        return converter;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setMessageListener(new MessageListenerAdapter(new Object() {
            void handleMessage(Object in) {
                System.out.println(in.getClass() + "\n" + in);
            }
        }, converter()));
        container.setQueueNames("foo");
        return container;
    }

}

Result:

class java.util.LinkedHashMap
{messageDTOList=[{createdDate=1477396301061, message={"mac":-99,"device":444,"ifindex":55,"ip":668,"arpdevice":99,"aprifindex":"","vlanindex":111,"enddevice":133}, crudOperation=UPDATE}], senderServerId=5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b, messageId=5b36a7a1-c17d-471f-b16e-e8e0bf4afb5b20, rebasoftServerType=AC, taskType=MAC}

A couple of things to note: Since the message has no type information in the headers (which would have been added by the outbound converter if it was used) the message is decoded to a simple Map.

If you want to decode to an object, you either need the type information in the headers, or you need to configure the converter with a class mapper (such as the DefaultClassMapper.

Second, it looks like the "message" element is double-encoded JSON - it's already JSON and has been re-encoded).

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for getting back Gary. I have added the info requested above into the question.
The provider side has its code written in java and I don't mind receiving it as a simple map. I have tried using the DefaultClassMapper as per the code but to no avail. I was wondering if a non-spring producer needs to provide header info as mandatory to a spring consumer?
This is your problem jsonMessageConverter.setClassMapper(defaultClassMapper()); and defaultClassMapper.setDefaultType(byte.class); you are trying to decode the JSON as a single byte. Just remove that class mapper and you'll get a Map.
I've got it working with that code still in there, though it may not be necessary. As mentioned below, the main issue was the way I was wiring the MessageListenerAdapter.
1

I figured out the issue, it appears that the MessageListenerAdapter in my configuration class does not like a ClientHandler with constructor arguments. Once I removed the constructor arguments it works fine.

Comments

0

No argument constructor is required in your payload/data class.

Comments

0

Updating the model classes to implement Serializable interface fixed the issue for me.

Comments

0

In my case I needed to covert the MessageBuilder from IntegrationFlow to MapMessage.

public class CustomConverter implements MessageConverter {

    public CustomConverter() {
    }

    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        if (object instanceof Message message) {
            return message;
        } else if (object instanceof String text) {
            return this.createMessageForString(text, session);
        } else if (object instanceof byte[] bytes) {
            return this.createMessageForByteArray(bytes, session);
        } else if (object instanceof Map<?, ?> map) {
            return this.createMessageForMap(map, session);
        } else if (object instanceof Serializable serializable) {
            return this.createMessageForSerializable(serializable, session);
        } else if(object instanceof MessageBuilder<?> messageBuilder) {
            return this.createMessageForJMS(messageBuilder, session);
        }else {
            throw new MessageConversionException("Cannot convert object of type [" + ObjectUtils.nullSafeClassName(object) + "] to JMS message. Supported message payloads are: String, byte array, Map<String,?>, Serializable object.");
        }
    }

   


    protected MapMessage createMessageForJMS(MessageBuilder<?> messageBuilder, Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        MessageHeaders headers = messageBuilder.build().getHeaders();
        File payload = ((File)messageBuilder.build().getPayload());

        message.setJMSCorrelationID(QueueUtil.generateCorrelationID());
        headers.forEach((s, o) -> {
            try {
                if(o instanceof UUID) {
                    message.setJMSMessageID(o.toString());
                } else message.setObject(s, o);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        });
        return message;
    }

}

Note - fromMessage code I copied from simpleconverter as I receive as MapMessage.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.