Publish-subscribe channels

We already know a little bit about message handlers and message channels. This time we'll discover specific message channel, called publish subscribe channel.

At the begin, we'll present the publish subscribe channel as well from theoretical point of view as from code one. At the second part we'll show how to use this channel by writing some test cases.

What is publish subscribe channel ?

Previously we discovered the most basic channel from Spring Integration, direct channel. As you remember, it's a point-to-point channel, ie. it sends the message to one and only one receiver. We can tell that publish subscribe channel is its contrast. This type of message channel allows the delivery to multiple receivers, known as subscribers. It can be thought in terms of observer design pattern. In this pattern, one object signals to subscribers every change of its state.

Programatically, PublishSubscribeChannel extends AbstractSubscribableChannel. Because this abstract class doesn't define any message receiving method, PublishSubscribeChannel is not intended to handle messages sent by another channels. As its name indicates, its main goal is to publish messages. The messages are published by org.springframework.integration.dispatcher.BroadcastingDispatcher instance. BroadcastingDispatcher can dispatch message in current execution thread and in new thread. The first method is the default one. The second one is applied only when one instance of java.util.concurrent.Executor is specified in channel's configuration.

And it's no PublishSubscribeChannel which stores a list of receivers. BroadcastingDispatcher makes it too. Every receiver must implement MessageHandler interface.

Example of publish subscribe channel in Spring Framework

In this part we'll present the features of PublishSubscribeChannel through some simple test cases. We'll see, inter alia, the configuration of min and max number of subscribers, definition of error handler and basic message delivery to subscribed MessageHandler implementations. Let's begin by the configuration:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<!-- configuration for publish subscribable channel -->
<int:publish-subscribe-channel id="pubSubChannel"/>

<!-- publish subscribe channel can contain error handler only if task executor is defined - otherwise all errors will be
      thrown in current execution thread. See PublishSubscribeChannel#setErrorHandler(ErrorHandler errorHandler) comment -->
<int:publish-subscribe-channel id="pubSubChannelWithErrorHandler" error-handler="pubSubErrorHandler" task-executor="taskExecutor"/>

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="5" />
    <property name="maxPoolSize" value="10" />
    <property name="queueCapacity" value="25" />
</bean>

As you can observe, PublishSubscribeChannel is defined with <publish-subscribe-channel /> element. It accepts different arguments, as error-handler. It represents the object which'll receive message sending exceptions. But beware because it'll be applied only when one task-executor is defined. Our sample error handler looks like:

/**
 * Sample error handler for publish-subscribe channel.
 *
 * @author Bartosz Konieczny
 */
@Component
public class PubSubErrorHandler implements ErrorHandler {

  private Queue<Throwable> handledErrors = new LinkedList<Throwable>();

  @Override
  public void handleError(Throwable throwable) {
    handledErrors.add(throwable);
  }

  public Queue<Throwable> getHandlerErrors() {
    return this.handledErrors;
  }
}

It the simplest version of error handler which only stores received errors and is able to return them to client. To see this object in action, we need to jump to test cases. The most important points of these cases are commented, so you should understand them without supplementary explaination:

/**
 * Test cases illustrating the features of {@link PublishSubscribeChannel}
 *
 * @author Bartosz Konieczny
 */
@ContextConfiguration(locations = "classpath:META-INF/pub-sub-channel.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class PubSubChannelTest {

  private final Map<String, Message<?>> receivedMessages = new HashMap<String, Message<?>>();

  @Autowired
  @Qualifier("pubSubChannel")
  private PublishSubscribeChannel pubSubChannel;

  @Autowired
  @Qualifier("pubSubChannelWithErrorHandler")
  private PublishSubscribeChannel pubSubChannelWithErrorHandler;

  @Autowired
  private PubSubErrorHandler pubSubErrorHandler;

  @Test
  public void testSendingAndMinMaxSubscribers() {
    MessageHandler handler1 = constructSampleHandler("#h1");
    pubSubChannel.subscribe(handler1);

    Product milk = new Product();
    milk.setName("milk");
    Message<Product> milkMsg = MessageBuilder.withPayload(milk).build();
    pubSubChannel.send(milkMsg);

    Message<?> handler1Message = receivedMessages.get("#h1");
    assertEquals("milk object should be sent through publish subscribe channel", milk, handler1Message.getPayload());

    // now add some subscriber limits
    pubSubChannel.setMaxSubscribers(1);
    MessageHandler handler2 = constructSampleHandler("#h2");
    boolean wasIae = false;
    try {
      pubSubChannel.subscribe(handler2);
    } catch (IllegalArgumentException iae) {
      wasIae = true;
    }
    assertTrue("IllegalArgumentException is expected for maximum subscriber exceeded error", wasIae);

    pubSubChannel.setMaxSubscribers(10);
    /**
      * Even if we set the minimal number of subscribers and this number is not reached, we won't receive an exception
      * as in the case of maximal number of subscribers. It's because this parameter doesn't mean the same thing.
      * In fact, minSubscribers is applied to PublishSubscribeChannel's dispatcher (
      * {@link org.springframework.integration.dispatcher.BroadcastingDispatcher}) which contains dispatching method
      * (public boolean dispatch(Message<?> message). Inside this method the dispatcher checks if the number of delivered
      * messages is equal to minSubscribers parameter. The comparison is made only when minSubscribers is greater than 0.
      * If the number of delivered messages is lower than minSubscribers, false is returned by send(Message<?>) method.
      */
    pubSubChannel.setMinSubscribers(5);

    Product water = new Product();
    water.setName("water");
    Message<Product> waterMsg = MessageBuilder.withPayload(water).build();
    boolean wasCorrectlySend = pubSubChannel.send(waterMsg);
    handler1Message = receivedMessages.get("#h1");
    assertEquals("water object should be sent through publish subscribe channel", water, handler1Message.getPayload());
    assertFalse("Message shouldn't be correctly sent (min 5 subscribers defined, but only 1 message handler subscribed)",
            wasCorrectlySend);

    // set new min subscribers number
    pubSubChannel.setMinSubscribers(2);
    pubSubChannel.subscribe(handler2);
    wasCorrectlySend = pubSubChannel.send(waterMsg);
    assertTrue("This time message should be correctly send (min 2 subscribers expected, 2 message handlers subscribed)",
            wasCorrectlySend);
    handler1Message = receivedMessages.get("#h2");
    assertEquals("water object should be sent through publish subscribe channel", water, handler1Message.getPayload());

    // now test unsubscribing
    pubSubChannel.unsubscribe(handler2);
    pubSubChannel.send(milkMsg);
    handler1Message = receivedMessages.get("#h2");
    assertNotEquals("Unsubscribed handler shouldn't receive newly crated message with 'milk' payload",
      milk, handler1Message.getPayload());
  }

  @Test
  public void testWithErrorHandler() {
    final String errorMessage = "Hard-coded MessagingException";
    MessageHandler handler3 = new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        throw new MessagingException(errorMessage);
      }
    };
    pubSubChannelWithErrorHandler.subscribe(handler3);
    Product water = new Product();
    water.setName("water");
    Message<Product> waterMsg = MessageBuilder.withPayload(water).build();
    pubSubChannelWithErrorHandler.send(waterMsg);

    // message of error thrown by defined error-handler should be the same as errorMessage object from this method
    // sleep 1 second because pubSubChannelWithErrorHandler is executed in separated thread and sleeping is the less
    // verbose way to "synchronize" both threads
    try {
      Thread.sleep(1000);
    } catch (Exception e){
      fail("Sleeping failed");
    }
    Throwable thrownError = pubSubErrorHandler.getHandlerErrors().poll();
    assertEquals("Bad error was thrown by error-handler", errorMessage, thrownError.getCause().getMessage());

    pubSubChannelWithErrorHandler.setIgnoreFailures(true);
    pubSubChannelWithErrorHandler.send(waterMsg);
    try {
      Thread.sleep(1000);
    } catch (Exception e){
      fail("Sleeping failed");
    }
    thrownError = pubSubErrorHandler.getHandlerErrors().poll();
    assertNull("Error handler shouldn't be invoked when ignore-failures attribute is set to true", thrownError);
  }

  private MessageHandler constructSampleHandler(final String handlerId) {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        receivedMessages.put(handlerId, message);
      }
    };
  }

}

This article presents new message channel, publish subscribe channel. We saw that it's oriented to publishing one message to one or more receivers. The receivers must implement MessageHandler interface and subscribe to the channel through subscribe() method. After that we could observed this channel in action.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!