Direct channel and service activator

As announced in the introduction to Spring Integration, we'll write a series of articles about features of this project. We'll begin here by explaining the ideas of direct channel and service activator.

At the first part of this article we'll introduce the detailed concept of message channels. At this moment we'll also explain very simple channel called direct channel. After that we'll talk about the concepts associated with service activator. At the end we'll write about additional element linked to direct channel and service activator: message converter. At the end we'll discover how to configure and test elements presented in this article.

Message channels

As we mentioned in introduction article quoted at the begin, channels in Spring Integration must implement MessageChannel interface. If you're looking for this interface in Spring Integration files, you're wrong. This interface is included in org.springframework.messaging package. It defines only two methods, both for send the messages: send(Message<?> message) and send(Message<?> message, long timeout.

As you can see, Spring doesn't mix sending and message receiving channels together. The second ones are defined by SubscribableChannel or PollableChannel. The difference between them is the way of handling sent messages. PollableChannel defines two methods thanks to which it can get sent Message instance directly from internal object stored sent messages. A good example of that is org.springframework.integration.channel.QueueChannel which receive method takes messages directly from internal queue:

private final BlockingQueue<Message<?>> queue;
@Override
protected Message<?> doReceive(long timeout) {
  try {
    if (timeout > 0) {
      return queue.poll(timeout, 
        TimeUnit.MILLISECONDS);
    }
    if (timeout == 0) {
      return queue.poll();
    }
    return queue.take();
  }
  catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return null;
  }
}

In the other hand, SubscribableChannel uses the implementations of MessageHandler to deal with messages transmitted through the channel. MessageHandler defines single one method, handleMessage(Message<?> message) which handles incoming message. SubscribableChannel defines two methods thanks to which MessageHandler's implementation can register (subscribe(MessageHandler handler)) or unregister (unsubscribe(MessageHandler handler)) message handlers. An example of this type of channel is PublishSubscribeChannel which subscribe and unsubscribe methods are implemented as below:

public boolean subscribe(MessageHandler handler) {
  MessageDispatcher dispatcher = this.getRequiredDispatcher();
  boolean added = dispatcher.addHandler(handler);
  this.adjustCounterIfNecessary(dispatcher, 
    added ? 1 : 0);
  return added;
}

public boolean unsubscribe(MessageHandler handle) {
  MessageDispatcher dispatcher = this.getRequiredDispatcher();
  boolean removed = dispatcher.removeHandler(handle);
  this.adjustCounterIfNecessary(dispatcher, 
    removed ? -1 : 0);
  return removed;
}

Messages are dispatched to message handles with the implementation of MessageDispatcher interface which defines 3 methods: addHandler(MessageHandler handler), removeHandler(MessageHandler handler) and dispatch(Message<?> message).

Direct channel

The first discovered message channel is maybe the simplest one. This is direct channel. It's called point-to-point channel. It means that it sends message directly to one and only one receiver. Technically, org.springframework.integration.channel.DirectChannel extends AbstractSubscribableChannel class which defines one method used to dispatch the message:

@Override
protected boolean doSend(Message<?> message, long timeout) {
  try {
    return this.getRequiredDispatcher()
     .dispatch(message);
  }
  catch (MessageDispatchingException e) {
    String description = e.getMessage() + " for channel '" 
      + this.getFullChannelName() + "'.";
    throw new MessageDeliveryException(message, description, e);
  }
}

As you can observe here, it uses an instance of previously presented MessageDispatcher. Used instance of this interface in DirectChannel is UnicastingDispatcher. Its dispatch method works as well with provided java.util.concurrent.Executor as without it. Because DirectChannel doesn't allow to set it, UnicastingDispatcher will treat all messages simply, without Executor. By doing that, all messages will be transmitted in single thread belonging to sender.

An interesting concept which we can discover with DirectChannel is a failover. What does it mean ? If failover is enabled and dispatcher fails to deliver a message to the first of defined message handlers, dispatcher will try to deliver the message to the rest of handlers. When failover is disabled, delivery will fail immediately. This attribute is enabled by default. You can see how it works in given method, called when an exception occurs on dispatching:

private boolean doDispatch(Message<?> message) {
  // ... some code is defined before but it's not important to understand failover
  Iterator<MessageHandler> handlerIterator = 
    this.getHandlerIterator(message);
  List<RuntimeException> exceptions = 
     new ArrayList<RuntimeException>();
  while (success == false && handlerIterator.hasNext()) {
    MessageHandler handler = handlerIterator.next();
    try {
      handler.handleMessage(message);
      success = true; // we have a winner.
    }
    catch (Exception e) {
      RuntimeException runtimeException = 
        this.wrapExceptionIfNecessary(message, e);
      exceptions.add(runtimeException);
      this.handleExceptions(exceptions, message, 
        !handlerIterator.hasNext());
    }
  }
  return success;
}

/**
 * Handles Exceptions that occur while dispatching. If this dispatcher has
 * failover enabled, it will only throw an Exception when the handler list
 * is exhausted. The 'isLast' flag will be true if the
 * Exception occurred during the final iteration of the MessageHandlers.
 * If failover is disabled for this dispatcher, it will re-throw any
 * Exception immediately.
 */
private void handleExceptions(List<RuntimeException> allExceptions, 
     Message<?> message, boolean isLast) {
  if (isLast || !this.failover) {
    if (allExceptions != null && allExceptions.size() == 1) {
      throw allExceptions.get(0);
    }
    throw new AggregateMessageDeliveryException(message,
      "All attempts to deliver Message to MessageHandlers failed.", allExceptions);
  }
}

Service activator

Until now we were focusing more on message dispatching rather than message receiving. In this part we'll introduce one element which can be used as message receiver: service activator. It permits to connect Spring-managed object to any input channel. Generally, two conditions must be respected to write a good service activator:
- it must be a Spring bean: for example annotated with @Component or defined as <bean /> in XML configuration file
- it must contain a method which will handle message transmitted by input channel. This method can be marked as it directly in XML confifuration (method attribute of <service-activator /> tag) or with @ServiceActivator annotation.

Service activator can only receive the message and that's all. But it can also manipulate received message, maybe by making some operations on it, and send modified message to output channel. If service activator will transfer the message, output channel must be explicitly defined in configuration (output-channel attribute) or in annotation (outputChannel attribute). In additionally, activator's method must return a Message instance.

Message converters

In the previous article about Spring Integration we're talking about transformers as about the ways to transform sent object from initial to desired type. However, it's not the only solution to achieve this because we can also use message converters. Based on org.springframework.messaging.converter.MessageConverter interface, message converters define the way of converting object to Message instance and getting the desired object's instance directly from Message. These two operations can be made in fromMessage(Message<?> message, Class<?> targetClass) and toMessage(Object payload, MessageHeaders header) methods.

One message converter is defined by channel extending AbstractMessageChannel class. It's activated in convertPayloadIfNecessary method which invokes fromMessage to generate instance of object transmitted in Message. This operation looks like:

Object converted = this.messageConverter.fromMessage(message, datatype);
  if (converted != null) {
    if (converted instanceof Message) {
      return (Message<?>) converted;
    }
    else {
      return this.getMessageBuilderFactory().withPayload(converted).copyHeaders(message.getHeaders()).build();
    }
}

Configuring and testing direct channel and service activator

It's the time to translate presented elements to life example. Let's begin by simple configuration file:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:channel id="directChannel" />
<int:channel id="stockCheckingChannel" datatype="com.waitingforcode.model.ProductWrapper"
  message-converter="productWrapperConverter">
  <!-- thanks to this configuration we can disable failover -->
  <int:dispatcher failover="false"/>
</int:channel>
<int:channel id="channelNoSubscriber" />
<int:channel id="anotherDirectChannel" />

<int:service-activator input-channel="directChannel" ref="sampleActivator"
  method="sampleHandler" />
<int:service-activator input-channel="anotherDirectChannel" 
  output-channel="stringOutputChannel" ref="sampleActivator" method="handlerWithReturn" />
<int:service-activator input-channel="stockCheckingChannel" ref="productWrapperActivator" />

As you can see, the configuration as an element quite simple. Each tag indiciates clearly configured element. Because direct channel is the default message channel in Spring Integration, the configuration used for it is also simple. One new element appeared in one of direct channel's configuration: datatype. This attribute is used to indicate which objects are accepted in sent message. Let's now check all elements defined here, before jumping to test cases:

@Component
public class StringOutputChannel implements MessageChannel {

  private Message<?> receivedMessage;

  @Override
  // for simplicity reason, send() is used to receive message
  public boolean send(Message<?> message) {
    receivedMessage = message;
    String msg = (String) message.getPayload();
    return true;
  }

  @Override
  public boolean send(Message<?> message, long l) {
    return this.send(message);
  }

  public Message<?> getReceivedMessage() {
    return receivedMessage;
  }
}

@Component
public class ProductWrapperConverter implements MessageConverter {

  @Override
  public Object fromMessage(Message<?> message, Class<?> aClass) {
    if (message.getPayload() instanceof Product) {
      Product product = 
        (Product)message.getPayload();
      ProductWrapper wrapper = 
        new ProductWrapper();
      wrapper.addProduct(product);
      return wrapper;
    } else if (message.getPayload() 
        instanceof Order) {
      Order order = 
        (Order)message.getPayload();
      ProductWrapper wrapper = 
        new ProductWrapper();
      wrapper.getProducts().addAll(order.getProducts());
      return wrapper;
   }
   return null;
  }

  @Override
  public Message<?> toMessage(Object o, MessageHeaders messageHeaders) {
      if (o instanceof Product) {
        System.out.println("Transforming");
        Product product = 
          (Product)o;
        ProductWrapper wrapper = 
          new ProductWrapper();
        wrapper.addProduct(product);
        return 
          MessageBuilder.withPayload(wrapper).build();
      } else if (o instanceof Order) {
        Order order = (Order)o;
        ProductWrapper wrapper = 
          new ProductWrapper();
        wrapper.getProducts().addAll(order.getProducts());
        return
          MessageBuilder.withPayload(wrapper).build();
      }
      return null;
  }

}


@Component
public class ProductWrapperConverter implements MessageConverter {

  @Override
  public Object fromMessage(Message<?> 
      message, Class<?> aClass) {
    if (message.getPayload() instanceof Product) {
      Product product = 
        (Product)message.getPayload();
      ProductWrapper wrapper = 
        new ProductWrapper();
      wrapper.addProduct(product);
      return wrapper;
    } else if (message.getPayload() instanceof Order) {
      Order order = 
        (Order)message.getPayload();
      ProductWrapper wrapper = 
        new ProductWrapper();
      wrapper.getProducts().addAll(order.getProducts());
      return wrapper;
    }
    return null;
  }

  @Override
  public Message<?> toMessage(Object o, MessageHeaders messageHeaders) {
    if (o instanceof Product) {
      System.out.println("Transforming");
      Product product = 
        (Product)o;
      ProductWrapper wrapper = 
        new ProductWrapper();
      wrapper.addProduct(product);
      return MessageBuilder.withPayload(wrapper).build();
    } else if (o instanceof Order) {
      Order order = 
        (Order)o;
      ProductWrapper wrapper = 
        new ProductWrapper();
      wrapper.getProducts().addAll(order.getProducts());
      return MessageBuilder.withPayload(wrapper).build();
    }
    return null;
  }

}

@Component
public class ProductWrapperActivator {

  @ServiceActivator(inputChannel="stockCheckingChannel")
  public void handleWrapper(Message<ProductWrapper> msg) {
    System.out.println("Service activator called for "
      +msg.getPayload()+" with headers "+msg.getHeaders());
  }

}

@Component
public class SampleActivator {

  private boolean notAnnotatedMethodCalled = false;
  private boolean directChannelMethodInvoked = false;

  public void notAnnotatedMethod() {
    notAnnotatedMethodCalled = true;
  }

  public boolean getNotAnnotatedMethodCalled() {
    return notAnnotatedMethodCalled;
  }

  public boolean getDirectChannelMethodInvoked() {
    return directChannelMethodInvoked;
  }

  // @ServiceActivator isn't mandatory when we specify
  // method="message-handler-name" attribute in
  // <int:service-activator /> tag. 
  // Its presence is explained only by the wish to 
  // store an example of @ServiceActivator annotation's use.
  //@ServiceActivator(inputChannel="directChannel")
  public void sampleHandler(Message<String> msg) {
    directChannelMethodInvoked = true;
    System.out.println("sampleHandler called for "
      +msg.getPayload()+" with headers "+msg.getHeaders());
  }

  public Message<String> handlerWithReturn(Message<String> msg) {
    System.out.println("Handling "+msg.getPayload()+" with headers "+msg.getHeaders());
    return MessageBuilder.withPayload("Content changed by activator").build();
  }
}

The code is not complicated. The most of cases look like sample beans composed with only setters and getters. Let's pass to test cases which will clarify better defined cases:

@ContextConfiguration(locations = "classpath:META-INF/directChannel-with-activator.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class SendTest {

  private ApplicationContext context;

  @Autowired
  public void setContext(ApplicationContext context) {
    this.context = context;
  }

  @Test
  public void testActivator() {
    SampleActivator activator = context.getBean(SampleActivator.class);
    assertFalse("Direct channel method shouldn't be invoked at this stage",
            activator.getDirectChannelMethodInvoked());
    assertFalse("Method not annotated with @ServiceActivator shouldn't be invoked",
            activator.getNotAnnotatedMethodCalled());

    MessageChannel direct = context.getBean("directChannel", MessageChannel.class);
    Message<String> sampleMsg = MessageBuilder.withPayload("Test content")
            .setHeader("website", "http://www.waitingforcode.com")
            .build();
    direct.send(sampleMsg);

    assertTrue("Direct channel method should be invoked after sending a message", activator.getDirectChannelMethodInvoked());
    assertFalse("Method not annotated with @ServiceActivator shouldn't be invoked",
            activator.getNotAnnotatedMethodCalled());
  }

  @Test
  public void testConvert() {
    MessageChannel multipleMessagesChannel = context.getBean("stockCheckingChannel", MessageChannel.class);

    // The stockCheckingChannel supports two types of messages: Order and Product. Both are handled differently, so we test
    // them separately. Product first:
    Product milk = new Product();
    milk.setName("Milk");
    Product cereal = new Product();
    cereal.setName("Cereal");

    Message<Product> productMsg = MessageBuilder.withPayload(milk).build();
    multipleMessagesChannel.send(productMsg);

    // Order after:
    Order breakfastOrder = new Order();
    breakfastOrder.addProduct(milk);
    breakfastOrder.addProduct(cereal);
    Message<Order> orderMsg = MessageBuilder.withPayload(breakfastOrder).build();
    multipleMessagesChannel.send(orderMsg);

    // normal message directly
    Product water = new Product();
    water.setName("Water");
    ProductWrapper wrapper = new ProductWrapper();
    wrapper.addProduct(water);
    Message<ProductWrapper> wrapperMsg = MessageBuilder.withPayload(wrapper).build();
    multipleMessagesChannel.send(wrapperMsg);
  }

  @Test
  public void testWithoutSubscriber() {
    MessageChannel channelNoSubscriber = context.getBean("channelNoSubscriber", MessageChannel.class);
    Message<String> sampleMsg = MessageBuilder.withPayload("Should not be delivered").build();
    boolean wasMde = false;
    try {
        channelNoSubscriber.send(sampleMsg);
    } catch (MessageDeliveryException mde) {
        wasMde = true;
    }
    assertTrue("MessageDeliveryException should be thrown for message sent without subscribers", wasMde);
  }

  /**
    * Tests sending bad typed message. Note however that this error
    * can be simply resolved by adding a converter
    * in this direction: String => Order and String => Product. 
    * You can see it one of the furthers test cases.
    */
  @Test
  public void testWithBadDatatype() {
    MessageChannel multipleMessagesChannel = context.getBean("stockCheckingChannel", MessageChannel.class);
    Message<String> sampleMsg = MessageBuilder.withPayload("Bad content shouldn't be delivered correctly").build();
    String exceptionMsg = "";
    try {
        multipleMessagesChannel.send(sampleMsg);
    } catch (MessageDeliveryException mde) {
        exceptionMsg = mde.getMessage();
    }
    String expectedBeginMsg = "Channel 'stockCheckingChannel' expected one of the following datataypes";
    assertTrue("Exception should begin by '"+expectedBeginMsg+"' but it is '"+exceptionMsg+"'",
            exceptionMsg.startsWith(expectedBeginMsg));
  }

  @Test
  public void testActivatorWithReturn() {
    MessageChannel direct = context.getBean("anotherDirectChannel", MessageChannel.class);
    Message<String> sampleMsg = MessageBuilder.withPayload("Test content")
            .setHeader("name", "Bartosz")
            .build();
    direct.send(sampleMsg);

    StringOutputChannel outputSample = context.getBean(StringOutputChannel.class);
    assertNotEquals(sampleMsg.getPayload(), (String)outputSample.getReceivedMessage().getPayload(),
            "Activator should modify the message before transmitting it into output channel");
  }

}

In this article we discovered some basic concepts of Spring Integration. We started by explain message channels which are used to transfer messages from one endpoint to another. After that we covered the case of one message channel, direct channel, which allows to send message only to one receiver in the same thread. After we presented service activator which can receive a message, treat it and transfer further to output channel. Before approaching real life examples, we saw message converter case used to convert objects. At the end put all presented concepts in practice.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!