Messaging gateways

on waitingforcode.com

Messaging gateways

You're doing Java/C#/JavaScript and doing it great? But you're tired because of always facing the same problems. I was like that 4 years ago. I changed then to the data engineering field and it solved my existential problems :) If you want to follow my path, I prepared a course that will help you with that! Join the class!
Previously we sent Spring Integration messages through message channels called input channels. However, it's not the single solution for it.

This article covers an idea of messaging gateways which one of purposes is transmitting message to message channel. We'll start by presenting the general idea of messaging gateways. After that, we'll see how do they work under the hood. At the last part, we'll implement messaging gateways in the code and see all features provided by Spring Integration to generate and send messages smartly.

Messaging gateways

Before approaching the idea of gateways, let's come back to previous articles about Spring Integration. As you can see, we send the message by direct invocation of message channel. It's a good practice in the way that we know quickly what happens in the application. But in the other hand, the code is strongly associated with the framework. One of methods which helps to integrate entreprise integration's code to business logic is based on messaging gateways.

Messaging gateways is in fact a pattern. The goal of it is to hide messaging system layer to programmer working on application code (business code) layer. We'll undertand this short sentence better after analyzing the implementation of messaging gateway in Spring Integration.

Gateways in Spring Integration

Gateways in Spring Integration are the simple interface with defined payload and message headers in methods signatures, for example as in given example:

 
public interface OrderService {

  void makeOrder(Order payload, Map<,String, Object> headers);
}

If you know Spring Data repositories concepts, you may suppose that the method's implementation will be defined automatically by the framework. And you're right because it's org.springframework.integration.gateway.GatewayProxyFactoryBean under the hood which defines the implementation. GatewayProxyFactoryBean has public Object invoke(final MethodInvocation invocation) throws Throwable method which calls instance of MethodInvocation passed in method's signature. Method making the real call is private Object invokeGatewayMethod(MethodInvocation invocation) throws Exception, both for sending and receiving actions:

  Method method = invocation.getMethod();
  MethodInvocationGateway gateway = this.gatewayMap.get(method);
  if (paramCount == 0 && !hasPayloadExpression) {
    if (shouldReply) {
      if (shouldReturnMessage) {
        return gateway.receive();
      }
      response = gateway.receive();
    }
  }
  else {
    Object[] args = invocation.getArguments();
    if (shouldReply) {
      response = shouldReturnMessage ? gateway.sendAndReceiveMessage(args) : gateway.sendAndReceive(args);
    }
    else {
      gateway.send(args);
      response = null;
    }
  }

The real gateways is an instance of GatewayProxyFactoryBean's inner class MethodInvocationGateway which extends MessagingGatewaySupport. And it's the second one which makes the real calls with private Object doSendAndReceive(Object object, boolean shouldConvert) method.

Gateways can be defined as well with annotations as with XML configuration files. Let's focus on XML configuration files. To define a messaging gateway, we need to use an element called <gateway /> and specify interface to proxy inside service-interface attribute. After, we can configure gateway's methods with @Gateway annotation or continue to define them as XML elements.

As you can see through this presentation, programmer of application code doesn't need to know about the messaging system. All work, message sending and receiving, is made transparently, with GatewayProxyFactoryBean. And the programmer needs only to define the interface and configure them.

Implement messaging gateways in Spring Integration

In our case, we'll configure messaging gateways with XML and annotations. XML will contain the definition of gateways while the annotations will serve to define methods. Let's begin by XML configuration:

<context:annotation-config />
<context:component-scan base-package="com.waitingforcode"/>

<int:channel id="inputBookingChannel" />
<int:channel id="inputSellingChannel" />
<int:channel id="inputBuyingChannel" />
<int:channel id="outputBookingChannel">
  <int:queue capacity="10"/>
</int:channel>
<int:channel id="outputSellingChannel">
  <int:queue capacity="10"/>
</int:channel>
<int:channel id="outputBuyingChannel">
  <int:queue capacity="10"/>
</int:channel>

<int:gateway service-interface="com.waitingforcode.services.ProductService" id="bookProduct" />

<int:service-activator input-channel="inputBookingChannel" ref="productActivator" requires-reply="true"
  output-channel="outputBookingChannel" method="handleBookingProduct" />
<int:service-activator input-channel="inputSellingChannel" ref="productActivator" requires-reply="true"
  output-channel="outputSellingChannel" method="handleSellingProduct" />
<int:service-activator input-channel="inputBuyingChannel" ref="productActivator" requires-reply="true"
  output-channel="outputBuyingChannel" method="handleBuyingProduct" />

Our test case will illustrate some operations concerning a store product: booking, buying and selling. Below you can find interface proxied by GatewayProxyFactoryBean, activator used to transfer the messages between channels and a bean used to generate one of headers:

// ProductService.java
/**
 * Messaging gateway example used to messaging-gateways.xml context file. As you can see, we use here the annotations to
 * understand better the idea of gateways.
 *
 * @author Bartosz Konieczny
 */
public interface ProductService {

  /**
    * This method is the simplest form of gateway's method declaration. By default, the first parameter is used as message's
    * payload. requestChannel and replyChannel represent input and output channels.
    *
    * @param product Product used as message payload.
    */
  @Gateway(requestChannel = "inputBookingChannel", replyChannel = "outputBookingChannel")
  void bookProduct(Product product);

  /**
    * This form of method is more sophisticated. It contains a supplementary attribute called "headers" which defines
    * headers appended to message. The first supplementary header "fixedHeader", will always contain the same value
    * ("hardCodedValue"). The second one, "dynamicHeader", will contain the value returned by
    * {@link com.waitingforcode.beans.ProductHeaderBean#getReversedName(com.waitingforcode.model.Product)} method, represented
    * in annotation thanks to Spring Expression Language.
    *
    * @param product Product used as message payload.
    */
  @Gateway(requestChannel = "inputSellingChannel", replyChannel = "outputSellingChannel", headers =
          {
                  @GatewayHeader(name = "fixedHeader", value = "hardCodedValue"),
                  @GatewayHeader(name = "dynamicHeader", expression = "@productHeaderBean.getReversedName(#args[0])")
          }
  )
  void sellProduct(Product product);

  /**
    * This form of method also defines supplementary headers. It makes that with @Header annotation. Both, "companyName" and
    * "price" headers, will be appended to message headers sent with the message. Note also the presence of @Payload annotation
    * representing message's payload.
    *
    * You can make a try and remove all annotations from this signature. Normally it should produce following exception:
    * 
    * org.springframework.messaging.converter.MessageConversionException: failed to convert object to Message
    * // ...
    * Caused by: org.springframework.messaging.MessagingException: At most one parameter (or expression via
    * method-level @Payload) may be mapped to the payload or Message. Found more than one on method
    * [public abstract void com.waitingforcode.services.ProductService.buyProduct
    * (java.lang.String,double,com.waitingforcode.model.Product)]
    * 
* * @param companyName Company name passed in message header. * @param price Product price passed in message header * @param product Product instance used as payload. */ @Gateway(requestChannel = "inputBuyingChannel", replyChannel = "outputBuyingChannel") void buyProduct(@Header("companyName") String companyName, @Header("price") double price, @Payload Product product); } // ProductActivator.java @Component public class ProductActivator { public Message<Product> handleBookingProduct(Message<Product> msg) { return msg; } public Message<Product> handleSellingProduct(Message<Product> msg) { return msg; } public Message<Product> handleBuyingProduct(Message<Product> msg) { return msg; } } // ProductHeaderBean.java @Component public class ProductHeaderBean { public String getReversedName(Product product) { return new StringBuilder(product.getName()).reverse().toString(); } }

Tests used to check defined methods look like:

@ContextConfiguration(locations = "classpath:META-INF/messaging-gateways.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class CheckGatewayTest {

  @Autowired
  @Qualifier("inputBookingChannel")
  private MessageChannel sender;

  @Autowired
  @Qualifier("outputBookingChannel")
  private PollableChannel bookingChannel;

  @Autowired
  @Qualifier("outputSellingChannel")
  private PollableChannel sellingChannel;

  @Autowired
  @Qualifier("outputBuyingChannel")
  private PollableChannel buyingChannel;

  @Autowired
  private ProductService productService;

  @Test
  public void testSimpleMessageReceiving() {
    Product milk = new Product();
    milk.setName("milk");

    // Please note that we don't use sender anywhere to transfer the message from
    // one channel to another. It's made directly by Spring through @Gateway annotated
    // method in ProductService interface
    productService.bookProduct(milk);

    Message<?> msg = bookingChannel.receive(3000);
    assertEquals("Object sent through Gateway should be milk", msg.getPayload(), milk);
  }

  @Test
  public void testSellingChannel() {
    Product coffee = new Product();
    coffee.setName("coffee");

    productService.sellProduct(coffee);

    Message<?> msg = sellingChannel.receive(3000);
    assertEquals("Object sent through Gateway should be coffee", msg.getPayload(), coffee);

    // Supplementary headers, created with @Gateway's headers attribute, should be present in this case.
    // One of them was resolved dynamically, from ProductHeaderBean's getReversedName method.
    assertEquals("Bad reversed product name was generated", "eeffoc", (String)msg.getHeaders().get("dynamicHeader"));
    assertEquals("Hard-coded header's value is not the same as expected", "hardCodedValue",
        (String)msg.getHeaders().get("fixedHeader"));
  }

  @Test
  public void testBuyingChannel() {
    Product potatoes = new Product();
    potatoes.setName("potatoes");

    productService.buyProduct("Wholesaler & son", 39.99d, potatoes);
    Message<?> msg = buyingChannel.receive(3000);
    assertEquals("Object sent through Gateway should be potatoes", msg.getPayload(), potatoes);

    // Supplementary headers, created with @Header annotation, should be present in this case.
    assertTrue("Price should be 39.99", (double)msg.getHeaders().get("price") == 39.99d);
    assertEquals("Bad seller company passed in header", "Wholesaler & son", (String)msg.getHeaders().get("companyName"));
  }

}

In this article we discovered messaging gateways. At the begin we introduced the concept by explaining theirs main goal. After that we saw how Spring Integration used gateways. We learned that GatewayProxyFactoryBean class executes all calling and receiving logic. Thanks to it application's programmer doesn't need to worry about entreprise integration code. At the end we saw how to implement messaging gateway pattern in real code.

If you liked it, you should read: Splitters and aggregators Routing Transformers

Share on: