Splitters and aggregators

With more and more data to send, enterprise integration applications can become less and less usable if code and architecture don't evolve. Beside these two key elements to improve growing system performances, Spring Integration provides another one, called messages splitting and aggregation.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

In this article we'll focus on aspect of dividing big messages into several smaller messages and assemble them together once all sent through message channel. At the first part we'll talk about this aspect in more general way, as a pattern coming from entreprise application integration patterns world. After that we'll descrive its implementation in Spring Integration. The last part will be consacred to test case illustrating Spring's splitter and aggregator.

Scatter-gather pattern

Entreprise integration application pattern defines an element being able to divide initial message and assemble it back after receiving some supplementary informations, as scatter-gather pattern. To understand better when this system is useful, you can imagine that you have a fly booking system. Your user starts by defining departure and arrival airports, fly dates and the number of passengers. Now your system takes these parameters and constructs search object. This object is send as a payload of one message to your partners.

After the response containing the more attractive fly is send back to the system. All messages sent by partners are aggregated to single one message and returned to output channel as it. Scatter-gather pattern can be also used to separate big messages to several smallest ones, make necessary operations on divided parts and aggregate them back at the end, as smaller message than the initial one.

Splitter and aggregator in Spring Integration

Spring Integration choosen another naming for scatter-gather pattern. The element dividing messages is called splitter and assembling them aggregator. Both are defined as message endpoints. To define custom splitter, we can use XML as well as annotation @Splitter. This element is simplest than aggregator because, mainly, it gets initial message, maybe work it a little, and dispatch to another channels. Splitting method must return a Collection or array of Message or another object. In the case of another object, it will be used as payload of further constructed messages.

Aggregator is more complicated because it not only aggregates elements worked by another channels, but also defines some strategies. The first strategy is a correlation strategy. Represented by org.springframework.integration.aggregator.CorrelationStrategy interface, it allows to define a magic key to link separated messages. The single one method is defined in this interface, public Object getCorrelationKey(Message<?> message). Correlation key is transmitted in every splitted message's header.

Another strategy is release strategy. Simply writing, it defines the moment when aggregator can assemble splitted message. The implementation of public boolean canRelease(MessageGroup messageGroup) is responsible for returning the information about grouping state (= if need to wait for some other messages before aggregator constructs correct Message or not).

The default strategy for correlation and releasing is based on the number of splitted messages, called sequence size. In this case, every splitted message contains two new entries in headers: sequenceSize and sequenceNumber. The first entry defines the total number of splitted messages. The second is like an id of splitted message. To aggregate splitted messages, another header entry is added: correlationId (for example: 6986f2ae-c88c-1dce-ac81-4c54813d15d9).

Example of splitter and aggregator in Spring Integration

This test case will illustrate passing an order in a shop. Seller will prepare a shopping cart and query all his partners to know the final price of the order. We'll start by configuring splitters and aggregators:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:channel id="sender" />
<int:channel id="receiver">
  <int:queue capacity="10" />
</int:channel>

<int:chain id="senderReceiverSample" input-channel="sender" output-channel="receiver">
  <int:splitter id="orderSplitter" ref="orderComponentsSplitter" />
  <int:service-activator ref="productActivator" method="quoteForTheBestPrice" requires-reply="true" />
  <int:aggregator id="orderAggregator" correlation-strategy="orderComponentsCorrelationStrategy"
    release-strategy="orderComponentsReleaseStrategy" ref="orderComponentsAggregator" />
</int:chain>

This configuration is based on message handler chain to readibility reasons. We use here two customized objects, one to split and another one to aggregate messages. Beside, we also have to custom strategies, applied to our shop case. These 4 elements are presented here:

/**
 * Sample message splitter which takes each product composing an order and send it separately, in new message.
 *
 * @author Bartosz Konieczny
 */
@Component
public class OrderComponentsSplitter extends AbstractMessageSplitter  {

  private Map<String, List<Message<?>>> splittedMessages = new HashMap<String, List<Message<?>>>();

  @Override
  protected Object splitMessage(Message<?> message) {
    Collection<Message<?>> messages = new ArrayList<Message<?>>();
    Order order = (Order) message.getPayload();
    Iterator<?> iterator = order.getProducts().iterator();
    while (iterator.hasNext()) {
      Product product = (Product) iterator.next();
      Message<?> msg = MessageBuilder.withPayload(product)
        .setHeaderIfAbsent(OrderComponentsCorrelationStrategy.CORRELATION_KEY, order.getId())
        .setHeaderIfAbsent(OrderComponentsCorrelationStrategy.LAST_KEY, !iterator.hasNext())
        .build();
      messages.add(msg);
      addMessage(""+order.getId(), msg);
    }
    return messages;
  }

  public Map<String, List<Message<?>>> getSplittedMessages() {
    return this.splittedMessages;
  }

  public List<Message<?>> getSplittedMessagesByKey(String key) {
    if (!getSplittedMessages().containsKey(key)) {
      addListOfSplittedMessages(key);
    }
    return getSplittedMessages().get(key);
  }

  private void addMessage(String key, Message<?> message) {
    getSplittedMessagesByKey(key).add(message);
  }

  private void addListOfSplittedMessages(String key) {
    getSplittedMessages().put(key, (new ArrayList<Message<?>>()));
  }
}

/**
 * Correlation strategy is an interface defining which key is used to associate {@link Message} to appropriated
 * {@link org.springframework.integration.store.MessageGroup}. In the case of our splitting of messages with {@link com.waitingforcode.model.Order}
 * payload, used key'll be order's id header.
 *
 * @author Bartosz Konieczny
 */
@Component
public class OrderComponentsCorrelationStrategy implements CorrelationStrategy {

  // key put into header and used by correlation strategy to link several Message<Product> to appropriate oneMessage<Order>
  public static final String CORRELATION_KEY = "orderId";
  // key of parameter meaning if splitted element is the last element needed to aggregate
  public static final String LAST_KEY = "isLast";

  @Override
  public Object getCorrelationKey(Message<?> message) {
    if (!message.getHeaders().containsKey(CORRELATION_KEY)) {
      throw new IllegalStateException("Message splitted by order splitter must contain orderId header. Present headers " +
        "were: "+message.getHeaders());
    }
    return message.getHeaders().get(CORRELATION_KEY);
  }
}

/**
 * Release strategy for splitted messages. Release strategy represents the moment when splitted messages can be released for
 * aggregation. In our case, we'll release {@link MessageGroup} when it contains a message with "isLast" header set to true.
 *
 * @author Bartosz Konieczny
 */
@Component
public class OrderComponentsReleaseStrategy implements ReleaseStrategy {

  @Override
  public boolean canRelease(MessageGroup messageGroup) {
    for (Message<?> msg : messageGroup.getMessages()) {
      if ((boolean)msg.getHeaders().get(OrderComponentsCorrelationStrategy.LAST_KEY)) {
        return true;
      }
    }
    return false;
  }
}

/**
 * Sample aggregator which gets all product messages splitted by {@link com.waitingforcode.splitters.OrderComponentsSplitter}
 * and put them back together into single {@link Order} instance, used as payload of final message.
 *
 * @author Bartosz Konieczny
 */
@Component
public class OrderComponentsAggregator {

  @Aggregator
  public Order aggregate(Collection<Message<?>> products) {
    Order order = new Order();
    for (Message<?> msg : products) {
      order.addProduct((Product)msg.getPayload());
      order.setId((int) msg.getHeaders().get(OrderComponentsCorrelationStrategy.CORRELATION_KEY));
    }
    order.calculateFinalPrice();
    return order;
  }
}

Test case for previous objects and some helpers are presented below:

/**
 * Sample service providing prices for products.
 *
 * @author Bartosz Konieczny
 */
@Service
public class PriceService {

  private static final double defaultPrice = 5.0d;
  private static final Map<String, Double> pricesByLetters = new HashMap<String, Double>();
  static {
    pricesByLetters.put("a", 11d);
    pricesByLetters.put("b", 3.6d);
    pricesByLetters.put("c", 2d);
    pricesByLetters.put("l", 2d);
    pricesByLetters.put("p", 10d);
  };

  public double priceFromProduct(Product product) {
    String idLetter = (""+product.getName().charAt(0)).toLowerCase();
    if (pricesByLetters.containsKey(idLetter)) {
      return pricesByLetters.get(idLetter);
    }
    return defaultPrice;
  }

}


@Component
public class ProductActivator {
  // supplementary method
  public Message<Product> quoteForTheBestPrice(Message<Product> msg) {
    Product product = msg.getPayload();
    product.setPrice(priceService.priceFromProduct(product));
    return msg;
  }
}

/**
 * Sample test for splitting-aggregation in Spring Integration.
 *
 * @author Bartosz Konieczny
 */
@ContextConfiguration(locations = "classpath:META-INF/splitting-aggregating.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class SplitterAndAggregatorTest {
  @Autowired
  @Qualifier("sender")
  private DirectChannel sender;

  @Autowired
  @Qualifier("receiver")
  private QueueChannel receiver;

  @Autowired
  @Qualifier("orderComponentsSplitter")
  private OrderComponentsSplitter splitter;

  @Test
  public void splitAndAggregate() {
    Order vegetarianMeal = new Order();
    vegetarianMeal.setId(3392);
    vegetarianMeal.addProduct(constructProduct("carrot"));
    vegetarianMeal.addProduct(constructProduct("apple"));
    vegetarianMeal.addProduct(constructProduct("potatoes"));
    vegetarianMeal.addProduct(constructProduct("beets"));
    vegetarianMeal.addProduct(constructProduct("lettuce"));
    Message<?> orderMsg = MessageBuilder.withPayload(vegetarianMeal).setHeaderIfAbsent(OrderComponentsCorrelationStrategy
      .CORRELATION_KEY, vegetarianMeal.getId()).build();
    sender.send(orderMsg, 4000);

    Message<?> receivedMsg = receiver.receive(4000);
    Order receivedOrder = (Order) receivedMsg.getPayload();
    assertTrue("Initial order price should be 0", vegetarianMeal.getFinalPrice() == 0d);
    assertEquals("Bad Order was sent as payload", vegetarianMeal.getId(), receivedOrder.getId());
    assertTrue("Aggregation failed, expected price was 28.6", 28.6d == receivedOrder.getFinalPrice());

    List<Message<?>> splitted = splitter.getSplittedMessagesByKey(""+vegetarianMeal.getId());
    assertEquals("5 splitted messages are expected", 5, splitted.size());
    List<String> acceptedNames = new ArrayList<String>();
    acceptedNames.add("carrot");
    acceptedNames.add("apple");
    acceptedNames.add("potatoes");
    acceptedNames.add("beets");
    acceptedNames.add("lettuce");
    for (Message<?> msg : splitted) {
      Product msgPayload = (Product) msg.getPayload();
      assertTrue("Unauthorized element was splitted", acceptedNames.indexOf(msgPayload.getName()) > -1);
    }
    for (Product product : receivedOrder.getProducts()) {
      assertTrue("Unauthorized element was sent to receiver", acceptedNames.indexOf(product.getName()) > -1);
    }
  }

  private Product constructProduct(String name) {
    Product product = new Product();
    product.setName(name);
    return product;
  }
}

Splitting and aggregation can prove to be very useful features in complicated cases of Spring Integration use, as for example: complex search engine. We saw that splitting takes one message and divide it on messages which can be sent to multiple channels. At the end all these messages are grouped thanks to defined correlation strategy and one signal, depending on release strategy, launches aggregator's job. This job consists on assembling all messages back to a single one message element.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!