Asynchronous events in Spring

on waitingforcode.com

Asynchronous events in Spring

Synchronous events have one major drawback: they're natively executed in the calling thread. If the listener handling on of synchronous events takes 5 seconds to respond, the final user won't able to see the response within at least 5 seconds. However, one alternative solution is proposed to this problem - asynchronous events.

This article will explain the asynchronous events in Spring framework. The first part will be exclusively theoretical and will describe main components and how they are working together. In the second part we'll write some test cases to check asynchronous events execution in Spring.

Asynchronous events in Spring

Handling of asynchronous events in Spring is based on native concurrency Java solutions as task executors. The events are dispatched by multicaster's multicastEvent method. It uses the implementations of java.util.concurrent.Executor interface to send events to appropriated listeners. Multicaster calls the synchronous executor because it's implemented by default.

The sample of asynchronous executor can be org.springframework.core.task.SimpleAsyncTaskExecutor. This class creates new Thread for every submitted task. However, it doesn't reuse the threads, so if we have a lot of long and asynchronous tasks to treat, thread creation risks to be overkill.

To benefit from thread pooling feature, we could use another Spring's implementation of Executor, org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor. As its name indicates, this executor allows us to work with thread pools.

Example of asynchronous event in Spring

It's the turn for example of implementation for asynchronous event dispatching. We'll write a multicaster which is able to handle both, synchronous and asynchronous events. Synchronous events will be dispatched with native synchronous dispatcher while asynchronous with Spring's ThreadPoolTaskExecutor. First, we need to define beans for our test case:

<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor" />
<bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <!-- 10 task will be submitted immediately -->
  <property name="corePoolSize" value="10" />
  <!-- If 10 task are already submitted and treated, we allow to enlarge pool capacity to 15 (10 from core pool size + 5 from max pool size) -->
  <property name="maxPoolSize" value="15" />
  <!-- Number of tasks that can be placed into waiting queue -->
  <property name="queueCapacity" value="10" />
</bean>

<bean id="applicationEventMulticaster" class="com.waitingforcode.event.SimpleEventMulticaster">
  <property name="taskExecutor" ref="syncTaskExecutor" />
  <property name="asyncTaskExecutor" ref="asyncTaskExecutor" />
</bean>
<bean id="taskStatsHolder" class="com.waitingforcode.event.TaskStatsHolder" />

Two classes which will be used to test tasks execution results are:

// TaskStatsHolder.java
/**
 * Holder bean for all executed tasks.
 */
public class TaskStatsHolder {

  private Map<String, TaskStatData> tasks = new HashMap<String, TaskStatData>();
  
  public void addNewTaskStatHolder(String key, TaskStatData value) {
    tasks.put(key, value);
  }
  
  public TaskStatData getTaskStatHolder(String key) {
    return tasks.get(key);
  }
}

// TaskStatData.java
/**
 * Holder class for all statistic data about already executed tasks.
 */
public class TaskStatData {

    private String threadName;
    private int executionTime;
    private long startTime;
    private long endTime;
    
    public TaskStatData(String threadName, long startTime, long endTime) {
      this.threadName = threadName;
      this.startTime = startTime;
      this.endTime = endTime;
      this.executionTime = Math.round((endTime - startTime) / 1000);
    }

    public String getThreadName() {
      return threadName;
    }
    public int getExecutionTime() {
      return this.executionTime;
    }
    public long getStartTime() {
      return this.startTime;
    }
    public long getEndTime() {
      return this.endTime;
    }
    
    @Override
    public String toString() {
      StringBuilder result = new StringBuilder();
      result.append("TaskStatData {thread name: ").append(this.threadName).append(", start time: ").append(new Date(this.startTime));
      result.append(", end time: ").append(new Date(this.endTime)).append(", execution time: ").append(this.executionTime).append(" seconds}");
      return result.toString();
    }
        
}

As you can see, these objects are simple data holders. They'll be used to check if our suppositions about character execution are correct. Two dispatched events aren't complicated too:

// ProductChangeFailureEvent.java
/**
 * This is synchronous event dispatched when one product is modified in the backoffice. 
 * When product's modification fails (database, validation problem), this event is dispatched to
 * all listeners. It's synchronous because we want to inform the user that some actions were done 
 * after the failure. Otherwise (asynchronous character of event) we shouldn't be able to
 * know if something was done or not after the dispatch.
 */
public class ProductChangeFailureEvent extends ApplicationContextEvent {

  private static final long serialVersionUID = -1681426286796814792L;
  public static final String TASK_KEY = "ProductChangeFailureEvent";

  public ProductChangeFailureEvent(ApplicationContext source) {
    super(source);
  }
}

// NotifMailDispatchEvent.java
/**
 * Event dispatched asynchronously every time when we want to send a notification mail. 
 * Notification mails to send should be stored somewhere (filesystem, database...) but in
 * our case, we'll handle only one notification mail: when one product out-of-stock becomes available again.
 */
public class NotifMailDispatchEvent extends ApplicationContextEvent implements AsyncApplicationEvent {

  private static final long serialVersionUID = 9202282810553100778L;
  public static final String TASK_KEY = "NotifMailDispatchEvent";

  public NotifMailDispatchEvent(ApplicationContext source) {
    super(source);
  }
} 

And the listeners used to handling their dispatch only put the data into task stat data holder:

// ProductChangeFailureListener.java
@Component
public class ProductChangeFailureListener 
    implements ApplicationListener<ProductChangeFailureEvent>{

  @Override
  public void onApplicationEvent(ProductChangeFailureEvent event) {
    long start = System.currentTimeMillis();
    long end = System.currentTimeMillis();
    ((TaskStatsHolder) event.getApplicationContext().getBean("taskStatsHolder")).addNewTaskStatHolder(ProductChangeFailureEvent.TASK_KEY, new TaskStatData(Thread.currentThread().getName(), start, end));
  }

}

// NotifMailDispatchListener.java
@Component
public class NotifMailDispatchListener 
    implements ApplicationListener<NotifMailDispatchEvent>{

  @Override
  public void onApplicationEvent(NotifMailDispatchEvent event) throws InterruptedException {
    long start = System.currentTimeMillis();
    // sleep 5 seconds to avoid that two listeners execute at the same moment
    Thread.sleep(5000);
    long end = System.currentTimeMillis();
    ((TaskStatsHolder) event.getApplicationContext().getBean("taskStatsHolder")).addNewTaskStatHolder(NotifMailDispatchEvent.TASK_KEY, new TaskStatData(Thread.currentThread().getName(), start, end));
  }
}

The controller used to the tests looks like:

@Controller
public class ProductController {

  @Autowired
  private ApplicationContext context;
  
  @RequestMapping(value = "/products/change-failure")
  public String changeFailure() {
    try {
      System.out.println("I'm modifying the product but a NullPointerException will be thrown");
      String name = null;
      if (name.isEmpty()) {
        // show error message here
      }
    } catch (Exception e) {
            context.publishEvent(new ProductChangeFailureEvent(context));
    }
    return "success";
  }
  
  
  @RequestMapping(value = "/products/change-success")
  public String changeSuccess() {
    System.out.println("Product was correctly changed");
    context.publishEvent(new NotifMailDispatchEvent(context));
    return "success";
  }
}

Finally, the test case:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/META-INF/applicationContext-test.xml"})
@WebAppConfiguration
public class SpringSyncAsyncEventsTest {

  @Autowired
  private WebApplicationContext wac;

  @Test
  public void test() {
    MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
    // execute both urls simultaneously
    mockMvc.perform(get("/products/change-success"));
    mockMvc.perform(get("/products/change-failure"));

    // get stats holder and check if both stats are available:
    // - mail dispatching shouldn't be available because it's executed after a sleep of 5 seconds
    // - product failure should be available because it's executed synchronously, almost immediately (no operations in listeners)
    TaskStatsHolder statsHolder = (TaskStatsHolder) this.wac.getBean("taskStatsHolder");
    TaskStatData mailStatData = statsHolder.getTaskStatHolder(NotifMailDispatchEvent.TASK_KEY);
    TaskStatData productFailureData = statsHolder.getTaskStatHolder(ProductChangeFailureEvent.TASK_KEY);
    assertTrue("Task for mail dispatching is executed after 5 seconds, so at this moment, it taskStatsHolder shouldn't contain it", 
        mailStatData == null);
    assertTrue("productFailureHolder shouldn't be null but it is", 
        productFailureData != null);
    assertTrue("Product failure listener should be executed within 0 seconds but took "+productFailureData.getExecutionTime()+" seconds", 
        productFailureData.getExecutionTime() == 0);
    while (mailStatData == null) {
        mailStatData = statsHolder.getTaskStatHolder(NotifMailDispatchEvent.TASK_KEY);
    }

    // check mail dispatching stats again, when available
    assertTrue("Now task for mail dispatching should be at completed state", 
        mailStatData != null);
    assertTrue("Task for mail dispatching should take 5 seconds but it took "+mailStatData.getExecutionTime()+" seconds", 
        mailStatData.getExecutionTime() == 5);
    assertTrue("productFailureHolder shouldn't be null but it is", 
        productFailureData != null);
    assertTrue("Product failure listener should be executed within 0 seconds but took "+productFailureData.getExecutionTime()+" seconds", 
        productFailureData.getExecutionTime() == 0);
    assertTrue("Thread executing mail dispatch and product failure listeners shouldn't be the same", 
        !productFailureData.getThreadName().equals(mailStatData.getThreadName()));
    assertTrue("Thread executing product failure listener ("+productFailureData.getThreadName()+") should be the same as current thread ("+Thread.currentThread().getName()+") but it wasn't", 
        Thread.currentThread().getName().equals(productFailureData.getThreadName()));
    assertTrue("Thread executing mail dispatch listener ("+mailStatData.getThreadName()+") shouldn't be the same as current thread ("+Thread.currentThread().getName()+") but it was", 
        !Thread.currentThread().getName().equals(mailStatData.getThreadName()));
    // make some output to see the informations about tasks
    System.out.println("Data about mail notif dispatching event: "+mailStatData);
    System.out.println("Data about product failure dispatching event: "+productFailureData);
  }
}

This test case proves well that two listeners aren't launched by the same executor. Product failure listener is executed by synchronous executor. Because they're no operations to made, the result is returned almost immediately. Regarding to mail dispatcher, it makes a 5 seconds sleep to show that its execution time lap over the execution time of product failure listener. In additionally, both are executed in different threads, so by different executors. We can see that also by analyzing the output:

Product was correctly changed
I'm modifying the product but a NullPointerException will be thrown
Data about mail notif dispatching event: TaskStatData {thread name: asyncTaskExecutor-1, start time: Thu Jun 19 21:14:18 CEST 2014, end time: Thu Jun 19 21:14:23 CEST 2014, execution time: 5 seconds}
Data about product failure dispatching event: TaskStatData {thread name: main, start time: Thu Jun 19 21:14:21 CEST 2014, end time: Thu Jun 19 21:14:21 CEST 2014, execution time: 0 seconds}

This article shows quickly how to deal with asynchronous events in Spring. It can be useful when listener can be executed for a long time and we don't want to block application execution until all listeners are executed. Asynchronous execution can be achieved with asynchronous executors as ThreadPoolTaskExecutor or SimpleAsyncTaskExecutor.

Share on: