Messages and message handlers

on waitingforcode.com

Messages and message handlers

This time we'll cover more core concepts of Spring Integration: messages and another concept with 'message' in the name, message handlers..

At the begin we'll describe the message in Spring Integration. We'll explain what it is, how to construct and send it. At the second part we'll see how to define message handlers, introduced in previous article. We'll terminate this article by explaining both concepts directly in test cases.

Messages in Spring Integration

As we already talked, messages sent through Spring Integration's channels implement org.springframework.messaging.Message interface. Different message types exist. The default one is GenericMessage. It's immutable, ie. thread-safe. The immutability is achieved thanks the absence of writing methods after objects creation. When we want to deal with mutable messages, we should opt for MutableMessage object. The mutability is achieved thanks to defined mutators. And, unlike in the case of GenericMessage, local fields storing two elements composing every message, payload and message headers, are non-final. The first element, payload, is in fact the object sent from one endpoint to another one. The headers contain supplementary information layer which accompanies messages. Header is represented by MeassageHeaders class which implements Map object. But unlike almost all basic Map implementations, MessageHeaders is immutable. It means that we can't append supplementary headers to already existent object by invoking one of Map's write methods (put, putAll, remove). An UnsupportedOperationException will be thrown in this case.

Each MessageHeaders contains elements automatically set by Spring Integration: id, timestamp, reply channel, error channel and content type. They're all generated in local constructor, invoked directly by public constructor:

protected MessageHeaders(Map<String, Object> headers, UUID id, Long timestamp) {
  this.headers = (headers != null ? new HashMap<String, Object>(headers) : new HashMap<String, Object>());
  if (id == null) {
    this.headers.put(ID, getIdGenerator().generateId());
  }
  else if (id == ID_VALUE_NONE) {
    this.headers.remove(ID);
  }
  else {
    this.headers.put(ID, id);
  }

  if (timestamp == null) {
    this.headers.put(TIMESTAMP, System.currentTimeMillis());
  }
  else if (timestamp < 0) {
    this.headers.remove(TIMESTAMP);
  }
  else {
    this.headers.put(TIMESTAMP, timestamp);
  }
}

Message can be constructed directly, manually, by invoking appropriate constructors, for example: public GenericMessage(T payload) or public GenericMessage(T payload, Map<String, Object> headers) of GenericMessage. As you can see, the second method allows us also to manually define headers object. And because Map's implementation is expected, we can use its writable implementation, as for example HashMap. Another method to construct messages in Spring Integration is MessageBuilder. Thanks to withPayload(T payload) method we can easily specify Message's payload. Builder has also a lot of methods used to manipulate MessageHeaders. We can for example remove header(s) directly by name or by ANT pattern. We can also copy headers directly from another object or quite simply set new header.

Message handlers in Spring Integration

Previously we described service activator as a kind of message's receiver. But it's not the single object which can do that because message handlers also can receive messages. Message handlers are the objects based on MessageHandler interface which defines single method to handle arriving message: void handleMessage(Message<?> message).

The main class implementing MessageHandler interface is org.springframework.integration.handler.AbstractMessageHandler. It implements handleMessage method in the way that we could use another Spring Integration features, as writing sent messages into history thanks to MessageHistory object. The real message treatment is made in implementations of protected abstract void handleMessageInternal(Message<?> message) throws Exception.

Among already implemented message handlers, we can distinguish the one for mail sending, MailSendingMessageHandler. Its handleMessageInternal implementation is quite simple and should help us to understand better the meaning of message handling by the AbstractMessageHandler sub classes:

@Override
protected final void handleMessageInternal(Message<?> message) {
  MailMessage mailMessage = this.convertMessageToMailMessage(message);
  if (mailMessage instanceof SimpleMailMessage) {
    this.mailSender.send((SimpleMailMessage) mailMessage);
  }
  else if (mailMessage instanceof MimeMailMessage) {
    this.mailSender.send(((MimeMailMessage) mailMessage).getMimeMessage());
  }
  else {
    throw new IllegalArgumentException(
      "Unsupported MailMessage type [" + mailMessage.getClass().getName() + "].");
  }
}

As you can see, this method handle Message in the way of converting it to MailMessage object. Converted object is further sent by used JavaMailSender.

But not only default message handlers use AbstractMessageHandler interface. Components not purely related to message treatment, as for example router which transfers the message to another endpoints, extends it too.

Message and message handlers example in Spring Integration

After this introduction, it's the time to write some code to illustrate two methods of construct Message instances and extends AbstractMessageHandler class. First, sample configuration:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:publish-subscribe-channel id="pubSubChannel" />
And now, the implementation of AbstractMessageHandler. We'll check only if last received message is the same as message sent through channel defined previously. This channel is of publish-subscribe type, not approached yet. We'll talk about it next time and focus on MessageHandler and Message now:
@Component
public class SampleMessageHandler extends AbstractMessageHandler {

  private Message<?> lastMessage;

  public Message<?> getLastMessage() {
    return this.lastMessage;
  }

  @Override
  protected void handleMessageInternal(Message<?> message) throws Exception {
    this.lastMessage = message;
  }
}

There are some test cases to show some of MessageHandler and Message purposes:

@ContextConfiguration(locations = "classpath:META-INF/message-and-message-handlers.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageAndHandlerTest {

  @Autowired
  private SampleMessageHandler sampleMessageHandler;

  @Autowired
  @Qualifier(value = "pubSubChannel")
  private PublishSubscribeChannel publishSubscribeChannel;

  private List<String> headersNotFound = new ArrayList();
  {
    headersNotFound.add(MessageHeaders.ID);
    headersNotFound.add(MessageHeaders.TIMESTAMP);
    headersNotFound.add("totalPrice");
  };

  @Test
  public void testSubscribe() {
    publishSubscribeChannel.subscribe(sampleMessageHandler);
    ProductWrapper wrapper = constructWrapper();
    Message<ProductWrapper> msg = new GenericMessage<ProductWrapper>(wrapper, new HashMap<String, Object>());
    publishSubscribeChannel.send(msg);

    assertEquals("Message sent through publish-subscribable channel should be  the same as message got by message handler " +
      " associated to this channel", (ProductWrapper) sampleMessageHandler.getLastMessage().getPayload(),
      msg.getPayload());
  }

  @Test
  public void testMessageManualInit() {
    ProductWrapper wrapper = constructWrapper();

    Map<String, Object> headers = new HashMap<String, Object>();
    headers.put("totalPrice", 31.99d);
    Message<ProductWrapper> message = new GenericMessage<ProductWrapper>(wrapper, headers);
    checkMessage(message, headersNotFound);
  }

  @Test
  public void testWithBuilder() {
    ProductWrapper wrapper = constructWrapper();
    Message<ProductWrapper> message = MessageBuilder.withPayload(wrapper)
    .setHeader("totalPrice", 31.99d)
    .build();
    checkMessage(message, headersNotFound);
  }

  @Test
  public void testImmutableHeaders() {
    Map<String, Object> headers = new HashMap<String, Object>();
    MessageHeaders messageHeaders = new MessageHeaders(headers);
    boolean wasUoe = false;
    try {
      messageHeaders.put("test", 1);
    } catch (UnsupportedOperationException uoe) {
      wasUoe = true;
    }
    assertTrue("UnsupportedOperationException should be thrown after trying to add new header after creating MessageHaders " +
      "object ", wasUoe);
  }

  private ProductWrapper constructWrapper() {
    Product milk = new Product();
    milk.setName("Milk");
    ProductWrapper wrapper = new ProductWrapper();
    wrapper.addProduct(milk);
    return wrapper;
  }

  private void checkMessage(Message message, List<String> headersNotFound) {
    MessageHeaders realHeaders = message.getHeaders();
    assertEquals("Number of expected headers aren't the same as for real MessageHeaders object", headersNotFound.size(),
    realHeaders.size());
    for (String key : realHeaders.keySet()) {
      headersNotFound.remove(key);
    }
    assertEquals("After iterating through all MessageHeaders entries, any header should left in headersNotFound list",
    headersNotFound.size(), 0);
  }
}

This article covered the message part of Spring Integration. At the begin we discovered what message was sent between endpoints. We also discovered the methods of message construction. After that we saw message handlers which were a kind of message receivers. At the end we illustrated all of these points by some test cases.

If you liked it, you should read: Splitters and aggregators Routing Transformers

Share on: