Asynchronous and scheduled tasks in Spring

on waitingforcode.com

Asynchronous and scheduled tasks in Spring

You're still doing Java/C#/JavaScript/Python/PHP... and need a wind of change? I was like that 4 years ago. I changed then to the data engineering field and it solved my existential problems :) If you want to follow my path, I prepared a course that will help you with that! Join the class!
Native Java offers a lot of possibilities to create thread pools, retrieve task results as Future instances. But Spring isn't worse and with its scheduling package allows to work with threads executed in background.

In the first part of the article, we'll explore the basics of scheduled tasks execution in Spring's. After, we'll explain how all classes work together to plan and launch scheduled tasks. Next part will present the configuration of scheduled and asynchronous tasks. At the end we'll pass to some more speaking cases and see how to code scheduled tasks through some unit tests.

What are asynchronous tasks in Spring ?

Before we start to approaching the hearth of working unit, Spring, we need to understand that it implements two different concepts: asynchronous tasks and scheduling tasks. Apparently, both seem to make the same thing: working in background. However, a fine difference exists between them. Scheduling tasks, unlike asynchronous, can be scheduled exactly as CRON jobs in Linux. We can, for example, tell that one task must be executed every 40 minutes. This configuration can be made as well through XML files as through annotations. Simple asynchronous tasks are executed in background, without the possibility to configure execution frequency.

Because they're two different task types, they're two different executors too. The first one looks like Java's concurrency executors, saw in the article about executors in Java. It's TaskExecutor which, according to Spring documentation, was introduced to provide Spring-based abstraction to handle thread pools. Another abstraction is TaskScheduler. It's used to schedule a task at a given point in the future and execute it once or periodically.

Another interesting points are triggers. Two types exist: CronTrigger or PeriodTrigger. The first one simulates CRON tasks behavior. So, we can submit the execution of one task at the exact point in the future. The other trigger can be used to periodic tasks execution.

Spring's asynchronous tasks classes

Let's begin the classes analyze by org.springframework.core.task.TaskExecutor. In fact, it's an interface extending Java's Executor interface. Its only method is execute that takes a Runnable task in parameter.

Little bit more complex is org.springframework.scheduling.TaskScheduler interface. It defines a set of methods those names beginning with schedule allow us to define a task to be executed in the future. All schedule* methods return java.util.concurrent.ScheduledFuture instances.

Two another mentioned components, triggers, implement both from org.springframework.scheduling.Trigger interface. It's only method, nextExecutionTime, defines the execution time of next triggered task. Its two implementations, CronTrigger and PeriodicTrigger, are able to do this operation thanks to informations stored by org.springframework.scheduling.TriggerContext implementations. Thanks to them, we can easily get the last scheduled execution time of one task (lastScheduledExecutionTime, the last completion time of given task (lastCompletionTime) or the last actual execution time (lastActualExecutionTime). Confused ? If yes, we can simply understand these values by regarding source code. org.springframework.scheduling.concurrent.ConcurrentTaskScheduler contains a private class EnterpriseConcurrentTriggerScheduler. Inside this class, we can find schedule method with following content:

public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
  ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
  return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
    @Override
    public Date getNextRunTime(LastExecution le, Date taskScheduledTime) {
      return trigger.nextExecutionTime(le != null ?
        new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
        new SimpleTriggerContext());
    }
    @Override
    public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
      return false;
    }
  });
}

SimpleTriggerContext looks familiar because its an implementation of TriggerContext interface. As you can see, the time values set in the constructor came from implementations of javax.enterprise.concurrent.LastExecution where:
- getScheduledStart: returns last time in which task was scheduled to run. It corresponds to TriggerContext's lastScheduledExecutionTime property.
- getRunStart: returns the time when given task started running. In TriggerContext, it corresponds to lastActualExecutionTime.
- getRunEnd: returns when the task terminated. It's used to set lastCompletionTime in TriggerContext.

Another important class in Spring scheduling and asynchronous execution is org.springframework.core.task.support.TaskExecutorAdapter. It's an adaptor that exposes java.util.concurrent.Executor as Spring-based executor, already described TaskExecutor. In reality, it holds a reference to Java's ExecutorService. This reference is used to accomplish all submitted tasks.

Configure asynchronous and scheduled tasks in Spring

How programmatically schedule asynchronous tasks ? First, we need to enable configuration through annotation. XML configuration for it looks like:

<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="2" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
<context:component-scan base-package="com.waitingforcode.async"/>

Both can be activated by adding @EnableScheduling and @EnableAsync annotations to configuration (annotated with @Configuration) class. If we've it, we can start to implement scheduling and asynchronous tasks. To implement the first category of tasks, we can use @Scheduled annotation. It's placed in org.springframework.scheduling.annotation package. It can take several attributes:
- cron: CRON-style configuration applied to launch annotated task.
- zone: time zone for which the CRON expression will be resolved.
- fixedDelay or fixedDelayString: provides a fixed delay of task execution, ie. task will execute with fixed period between the end the last invocation and the start of the next.
- fixedRate or fixedRateString: invocations of method annotated with fixedRate will be make with a fixed period (for example: every 10 seconds), independently of execution lifecycle (started, ended).
- initialDelay or initialDelayString: time which will delay the first execution of scheduled method. Note that all values (fixedDelay*, fixedRate*, initialDelay*) must be expressed in milliseconds. Remember that methods annotated with @Scheduled can't accept any arguments and must return nothing (void). Scheduled methods are managed by container and not invoked by callers at runtime. They're parsed by org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor which contains following method to deny execution of all incorrectly defined functions:

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
  try {
    Assert.isTrue(void.class.equals(method.getReturnType()),
                    "Only void-returning methods may be annotated with @Scheduled");
    Assert.isTrue(method.getParameterTypes().length == 0,
                    "Only no-arg methods may be annotated with @Scheduled");
// ...

Annotation that marks a method or a class (by marking a class, we mark automatically all its methods) as asynchronous, is @Async. Unlike @Scheduled, asynchronous tasks accept parameters and may return something.

Example of asynchronous tasks execution in Spring

With this knowledge, we can pass to write asynchronous and scheduled tasks. All mandatory comments will be placed inside test case. The tests will be separated by individual frames. Let's begin by the tests on different task executors:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:META-INF/applicationContext-test.xml"})
@WebAppConfiguration
public class TaskExecutorsTest {

  @Test
  public void simpeAsync() throws InterruptedException {
    /*
      * SimpleAsyncTaskExecutor creates new Thread for every task and executes it asynchronously. The threads aren't reused as in 
      * native Java's thread pools.
      * 
      * The number of concurrently executed threads can be specified through concurrencyLimit bean property 
      * (concurrencyLimit XML attribute). Here it's more simple to invoke setConcurrencyLimit method. 
      * Here the tasks will be executed by 2 simultaneous threads. Without specifying this value,
      * the number of executed threads will be indefinite.
      * 
      * You can observe that only 2 tasks are executed at a given time - even if 3 are submitted to execution (lines 40-42).
      */
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("thread_name_prefix_____");
    executor.setConcurrencyLimit(2);
    executor.execute(new SimpleTask("SimpleAsyncTask-1", Counters.simpleAsyncTask, 1000));
    executor.execute(new SimpleTask("SimpleAsyncTask-2", Counters.simpleAsyncTask, 1000));

    Thread.sleep(1050);
    assertTrue("2 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 2);

    executor.execute(new SimpleTask("SimpleAsyncTask-3", Counters.simpleAsyncTask, 1000));
    executor.execute(new SimpleTask("SimpleAsyncTask-4", Counters.simpleAsyncTask, 1000));
    executor.execute(new SimpleTask("SimpleAsyncTask-5", Counters.simpleAsyncTask, 2000));
    
    Thread.sleep(1050);
    assertTrue("4 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 4);
    executor.execute(new SimpleTask("SimpleAsyncTask-6", Counters.simpleAsyncTask, 1000));

    Thread.sleep(1050);
    assertTrue("6 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", 
      Counters.simpleAsyncTask.getNb() == 6);
  }
  
  @Test
  public void syncTaskTest() {
    /*
      * SyncTask works almost as Java's CountDownLatch. In fact, this executor is synchronous with the calling thread. In our case,
      * SyncTaskExecutor tasks will be synchronous with JUnit thread. It means that the testing thread will sleep 5 
      * seconds after executing the third task ('SyncTask-3'). To prove that, we check if the total execution time is ~5 seconds.
      */
    long start = System.currentTimeMillis();
    SyncTaskExecutor executor = new SyncTaskExecutor();
    executor.execute(new SimpleTask("SyncTask-1", Counters.syncTask, 0));
    executor.execute(new SimpleTask("SyncTask-2", Counters.syncTask, 0));
    executor.execute(new SimpleTask("SyncTask-3", Counters.syncTask, 0));
    executor.execute(new SimpleTask("SyncTask-4", Counters.syncTask, 5000));
    executor.execute(new SimpleTask("SyncTask-5", Counters.syncTask, 0));
    long end = System.currentTimeMillis();
    int execTime = Math.round((end-start)/1000);
    assertTrue("Execution time should be 5 seconds but was "+execTime+" seconds", execTime == 5); 
  }
  
  @Test
  public void threadPoolTest() throws InterruptedException {
    /*
      * This executor can be used to expose Java's native ThreadPoolExecutor as Spring bean, with the 
      * possibility to set core pool size, max pool size and queue capacity through bean properties.
      * 
      * It works exactly as ThreadPoolExecutor from java.util.concurrent package. It means that our pool starts 
      * with 2 threads (core pool size) and can be growth until 3 (max pool size).
      * In additionally, 1 task can be stored in the queue. This task will be treated 
      * as soon as one from 3 threads ends to execute provided task. In our case, we try to execute 5 tasks
      * in 3 places pool and 1 place queue. So the 5th task should be rejected and TaskRejectedException should be thrown.
      */
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(2);
    executor.setMaxPoolSize(3);
    executor.setQueueCapacity(1);
    executor.initialize();

    executor.execute(new SimpleTask("ThreadPoolTask-1", Counters.threadPool, 1000));
    executor.execute(new SimpleTask("ThreadPoolTask-2", Counters.threadPool, 1000));
    executor.execute(new SimpleTask("ThreadPoolTask-3", Counters.threadPool, 1000));
    executor.execute(new SimpleTask("ThreadPoolTask-4", Counters.threadPool, 1000));
    boolean wasTre = false;
    try {
      executor.execute(new SimpleTask("ThreadPoolTask-5", Counters.threadPool, 1000));
    } catch (TaskRejectedException tre) {
      wasTre = true;
    }
    assertTrue("The last task should throw a TaskRejectedException but it wasn't", wasTre);

    Thread.sleep(3000);

    assertTrue("4 tasks should be terminated, but "+Counters.threadPool.getNb()+" were instead", 
      Counters.threadPool.getNb()==4);
  }

}

class SimpleTask implements Runnable {
  private String name;
  private Counters counter;
  private int sleepTime;
  
  public SimpleTask(String name, Counters counter, int sleepTime) {
    this.name = name;
    this.counter = counter;
    this.sleepTime = sleepTime;
  }
  
  @Override
  public void run() {
    try {
      Thread.sleep(this.sleepTime);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    this.counter.increment();
    System.out.println("Running task '"+this.name+"' in Thread "+Thread.currentThread().getName());
  }
  
  @Override
  public String toString() {
          return "Task {"+this.name+"}";
  }
}

enum Counters {
        
  simpleAsyncTask(0),
  syncTask(0),
  threadPool(0);
  
  private int nb;
  
  public int getNb() {
    return this.nb;
  }
  
  public synchronized void increment() {
    this.nb++;
  }

  private Counters(int n) {
    this.nb = n;
  }
}

By the past, more executors were available (ConcurrentTaskExecutor, SimpleThreadPoolTaskExecutor, TimerTaskExecutor). But almost all of them were deprecated and native Java's executors or Spring correspondences are preferred. In additionally, take a look on printed result:

Running task 'SimpleAsyncTask-1' in Thread thread_name_prefix_____1
Running task 'SimpleAsyncTask-2' in Thread thread_name_prefix_____2
Running task 'SimpleAsyncTask-3' in Thread thread_name_prefix_____3
Running task 'SimpleAsyncTask-4' in Thread thread_name_prefix_____4
Running task 'SimpleAsyncTask-5' in Thread thread_name_prefix_____5
Running task 'SimpleAsyncTask-6' in Thread thread_name_prefix_____6
Running task 'SyncTask-1' in Thread main
Running task 'SyncTask-2' in Thread main
Running task 'SyncTask-3' in Thread main
Running task 'SyncTask-4' in Thread main
Running task 'SyncTask-5' in Thread main
Running task 'ThreadPoolTask-2' in Thread ThreadPoolTaskExecutor-2
Running task 'ThreadPoolTask-1' in Thread ThreadPoolTaskExecutor-1
Running task 'ThreadPoolTask-4' in Thread ThreadPoolTaskExecutor-3
Running task 'ThreadPoolTask-3' in Thread ThreadPoolTaskExecutor-2

As you can deduce, the first test makes new thread for every task. We can see it thanks to different thread names. The second one, synchronous executor, should execute all tasks in the calling thread. And it's the case because 'main' is the name of principal thread and it's principal thread invokes all synchronous tasks. The last case concerns thread pool with 3 created threads. As we can see, they're only 3 created threads.

Now, we'll write some unit test to check @Async and @Scheduled implementations.

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:META-INF/applicationContext-test.xml"})
@WebAppConfiguration
public class AnnotationTest {

  @Autowired
  private GenericApplicationContext context;
        
  @Test
  public void testScheduled() throws InterruptedException {

      System.out.println("Start sleeping");
      Thread.sleep(6000);
      System.out.println("Wake up !");

      TestScheduledTask scheduledTask = (TestScheduledTask) context.getBean("testScheduledTask");
       /*
        * Test fixed delay. It's executed every 6 seconds. The first execution is registered after application's context start. 
        */
      assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedDelayCounter(), 
        scheduledTask.getFixedDelayCounter() == 2);
      
       /*
        * Test fixed rate. It's executed every 6 seconds. The first execution is registered after application's context start. 
        * Unlike fixed delay, a fixed rate configuration executes one task with specified time. For example, it will execute on 
        * 6 seconds delayed task at 10:30:30, 10:30:36, 10:30:42 and so on - even if the task 10:30:30 taken 30 seconds to 
        * be terminated. In teh case of fixed delay, if the first task takes 30 seconds, the next will be executed 6 seconds 
        * after the first one, so the execution flow will be: 10:30:30, 10:31:06, 10:31:12.
        */
      assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedRateCounter(), 
        scheduledTask.getFixedRateCounter() == 2);
       /*
        * Test fixed rate with initial delay attribute. The initialDelay attribute is set to 6 seconds. It causes that 
        * scheduled method is executed 6 seconds after application's context start. In our case, it should be executed 
        * only once because of previous Thread.sleep(6000) invocation.
        */
      assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getInitialDelayCounter(), scheduledTask.getInitialDelayCounter() == 1);
       /*
        * Test cron scheduled task. Cron is scheduled to be executed every 6 seconds. It's executed only once, 
        * so we can deduce that it's not invoked directly before applications 
        * context start, but only after configured time (6 seconds in our case).
        */
      assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getCronCounter(), scheduledTask.getCronCounter() == 1);
  }
        
  @Test
  public void testAsyc() throws InterruptedException {
       /*
        * To test @Async annotation, we can create a bean in-the-fly. AsyncCounter bean is a
        * simple counter which value should be equals to 2 at the end of the test. A supplementary test
        * concerns threads which execute both of AsyncCounter methods: one which 
        * isn't annotated with @Async and another one which is annotated with it. For the first one, invoking
        * thread should have the same name as the main thread. For annotated method, it can't be executed in 
        * the main thread. It must be executed asynchrounously in a new thread.
        */
      context.registerBeanDefinition("asyncCounter", new RootBeanDefinition(AsyncCounter.class));
      
      String currentName = Thread.currentThread().getName();
      AsyncCounter asyncCounter = context.getBean("asyncCounter", AsyncCounter.class);
      asyncCounter.incrementNormal();
      assertTrue("Thread executing normal increment should be the same as JUnit thread but it wasn't (expected '"+currentName+"', got '"+asyncCounter.getNormalThreadName()+"')",
                      asyncCounter.getNormalThreadName().equals(currentName));
      asyncCounter.incrementAsync();
      // sleep 50ms and give some time to AsyncCounter to update asyncThreadName value
      Thread.sleep(50);

      assertFalse("Thread executing @Async increment shouldn't be the same as JUnit thread but it wasn (JUnit thread '"+currentName+"', @Async thread '"+asyncCounter.getAsyncThreadName()+"')",
                      asyncCounter.getAsyncThreadName().equals(currentName));
      System.out.println("Main thread execution's name: "+currentName);
      System.out.println("AsyncCounter normal increment thread execution's name: "+asyncCounter.getNormalThreadName());
      System.out.println("AsyncCounter @Async increment thread execution's name: "+asyncCounter.getAsyncThreadName());
      assertTrue("Counter should be 2, but was "+asyncCounter.getCounter(), asyncCounter.getCounter()==2);
  }

}

class AsyncCounter {
        
  private int counter = 0;
  private String normalThreadName;
  private String asyncThreadName;
  
  public void incrementNormal() {
    normalThreadName = Thread.currentThread().getName();
    this.counter++;
  }
  
  @Async
  public void incrementAsync() {
    asyncThreadName = Thread.currentThread().getName();
    this.counter++;
  }
  
  public String getAsyncThreadName() {
    return asyncThreadName;
  }
  
  public String getNormalThreadName() {
    return normalThreadName;
  }
  
  public int getCounter() {
    return this.counter;
  }
        
}

In additionally, we need to create new configuration file and a class which method will be scheduled:

<!-- imported configuration file first -->
<!-- Activates various annotations to be detected in bean classes -->
<context:annotation-config />

<!-- Scans the classpath for annotated components that will be auto-registered as Spring beans.
 For example @Controller and @Service. Make sure to set the correct base-package-->
<context:component-scan base-package="com.mysite.test.schedulers" />
 
<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="40" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
// scheduled methods after, all are executed every 6 seconds (scheduledAtFixedRate and scheduledAtFixedDelay start to execute at
// application context start, two other methods begin 6 seconds after application's context start)
@Component
public class TestScheduledTask {

  private int fixedRateCounter = 0;
  private int fixedDelayCounter = 0;
  private int initialDelayCounter = 0;
  private int cronCounter = 0;

  @Scheduled(fixedRate = 6000)
  public void scheduledAtFixedRate() {
    System.out.println("<R> Increment at fixed rate");
    fixedRateCounter++;
  }
  
  @Scheduled(fixedDelay = 6000)
  public void scheduledAtFixedDelay() {
    System.out.println("<D> Incrementing at fixed delay");
    fixedDelayCounter++;
  }
  
  @Scheduled(fixedDelay = 6000, initialDelay = 6000)
  public void scheduledWithInitialDelay() {
    System.out.println("<DI> Incrementing with initial delay");
    initialDelayCounter++;
  }
  
  @Scheduled(cron = "*/6 * * * * *")
  public void scheduledWithCron() {
    System.out.println("<C> Incrementing with cron");
    cronCounter++;
          
  }
  
  public int getFixedRateCounter() {
    return this.fixedRateCounter;
  }
  
  public int getFixedDelayCounter() {
    return this.fixedDelayCounter;
  }
  
  public int getInitialDelayCounter() {
    return this.initialDelayCounter;
  }
  
  public int getCronCounter() {
    return this.cronCounter;
  }
        
}

The output of this test should show:

<R> Increment at fixed rate
<D> Incrementing at fixed delay
Start sleeping
<C> Incrementing with cron
<DI> Incrementing with initial delay
<R> Increment at fixed rate
<D> Incrementing at fixed delay
Wake up !
Main thread execution's name: main
AsyncCounter normal increment thread execution's name: main
AsyncCounter @Async increment thread execution's name: taskExecutor-1

This article introduces us to another interesting feature of Spring framework, scheduling. We saw that the tasks can be scheduled as well with fixed rate as with CRON-like configuration. We also proved that methods annotated with @Async are executed in different thread that the calling thread. Scheduling feature supports also native Java classes which can be used to manage thread pools for example.

Share on: