TransferQueue

Often consumer-producer applications stores common elements in shared queue. Java offers different types of this data structure and one of them is TransferQueue.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

This article presents TransferQueue. Its first part tries to explain the differences between it and BlockingQueue which is its parent interface. The second part shows how TransferQueue works through some simple test cases.

What is TransferQueue ?

java.util.concurrent.TransferQueue interface extends the interface of BlockingQueue and adds some supplementary methods. We can divide them on 2 groups: transfer (concerns sending message from producer to consumer) and consumers (concerns the stats about consumers waiting to receive new elements).

TransferQueue brings another way to define methods in the queue, through transfer(E) method and its derivates. With this method, thread invoking it will block until some consumer will be available for taking the element from the queue. If it never happens, the thread can stay blocked infinitely. Another method tries to avoid blocking situation, tryTransfer(E, long, TimeUnit) which returns false when given element wasn't consumed after the time specified in its parameters.

It's these transfer* methods which differentiate TransferQueue from BlockingQueue. BlockingQueue has no possibility to wait until another thread receives its element while TransferQueue has. By the way, Oracle also describes TransferQueue as a "refinement":

// http://download.oracle.com/javase/7/docs/technotes/guides/collections/changes7.html The interface TransferQueue has been added. It is a refinement of the BlockingQueue interface in which producers can wait for consumers to receive elements. One implementation of the new interface is also included in this release, LinkedTransferQueue.

Examples of TransferQueue

Let's now see what this new queue interface gives through some test cases:

public class TransferQueueTest {

  private static final String[] LETTERS =
          new String[] {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "R", "S", "T"};

  @Test
  public void should_not_be_able_to_send_an_element_with_non_blocking_operation_because_of_not_defined_consumer() throws InterruptedException {
    TransferQueue<String> queue = new LinkedTransferQueue<>();
    queue.add("A");
    queue.add("B");

    // when no consumer is defined for the queue and we tryTransfer an object, the element is not transferred and
    // doesn't exist anymore in the queue
    boolean wasTransferred = queue.tryTransfer("_O_", 2, TimeUnit.SECONDS);

    assertThat(wasTransferred).isFalse();
    assertThat(queue).containsOnly("A", "B");
  }

  @Test
  public void should_block_because_of_not_defined_consumer() throws InterruptedException {
    TransferQueue<String> queue = new LinkedTransferQueue<>();
    Boolean[] wasBlocked = new Boolean[] {true};
    new Thread(() -> {
      try {
        queue.transfer("C");
        wasBlocked[0] = false;
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }).start();
    // add some waiting time to see if adds were launched
    Thread.sleep(2000);

    // transfer method is blocking and doesn't allow to add new elements when transferred one is not consumed
    assertThat(wasBlocked[0]).isTrue();
  }

  @Test
  public void should_correctly_consume_transferred_elements() throws InterruptedException {
    TransferQueue<String> queue = new LinkedTransferQueue<>();
    CountDownLatch latch = new CountDownLatch(1);
    new Thread(() -> {
      for (String letter : LETTERS) {
        try {
          queue.transfer(letter);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      latch.countDown();
    }).start();
    TransferQueueReader consumer = new TransferQueueReader(queue);
    new Thread(consumer).start();

    latch.await();

    assertThat(consumer.getConsumedElements())
      .containsExactly("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "R", "S", "T");
  }

  @Test
  public void should_detect_waiting_consumers() throws InterruptedException {
    TransferQueue<String> queue = new LinkedTransferQueue<>();
    CountDownLatch latch = new CountDownLatch(1);
    new Thread(() -> {
      for (String letter : LETTERS) {
        try {
          queue.transfer(letter);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      latch.countDown();
    }).start();

    TransferQueueReader consumer = new TransferQueueReader(queue);
    new Thread(consumer).start();

    latch.await();

    assertThat(queue).isEmpty();
    assertThat(queue.hasWaitingConsumer()).isTrue();
    assertThat(consumer.getConsumedElements())
      .containsExactly("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "R", "S", "T");
  }

  @Test
  public void should_not_have_waiting_consumers_on_stoping_consumer_reading() throws InterruptedException {
    TransferQueue<String> queue = new LinkedTransferQueue<>();
    CountDownLatch latch = new CountDownLatch(1);
    new Thread(() -> {
      for (String letter : LETTERS) {
        try {
          queue.transfer(letter);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      latch.countDown();
    }).start();

    TransferQueueReader consumer = new TransferQueueReader(queue);
    new Thread(consumer).start();

    latch.await();
    consumer.stopRead();
    // give some time to consumer to end its time-based poll()
    Thread.sleep(2000);

    assertThat(queue).isEmpty();
    assertThat(queue.hasWaitingConsumer()).isFalse();
    assertThat(consumer.getConsumedElements())
      .containsExactly("A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "R", "S", "T");

  }

  private static final class TransferQueueReader implements Runnable {

    private TransferQueue<String> queue = new LinkedTransferQueue<>();

    private List<String> consumedElements = new ArrayList<>();

    private boolean continueReading = true;

    public TransferQueueReader(TransferQueue<String> queue) {
      this.queue = queue;
    }

    @Override
    public void run() {
      while (continueReading == true) {
        try {
          // use timeouted version of poll(), otherwise stop won't work because take
          // will wait for new events without taking in account continueReading change (Javadoc):
          // "Retrieves and removes the head of this queue, waiting if necessary until an element becomes available."
          String letter = queue.poll(2, TimeUnit.SECONDS);
          if (letter != null) {
              consumedElements.add(letter);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }

    public void stopRead() {
      this.continueReading = false;
    }

    public List<String> getConsumedElements() {
      return consumedElements;
    } 
  }

}

TransferQueue is almost like BlockingQueue. The difference consists on fact that TransferQueue is able to block a thread until its message is not consumed. But this kind of blocking can be resolved by the use of tryTransfer methods which allows to specify sending timeout. We saw also that TransferQueue offers the possibility to investigate if there are any other thread consuming the queue through hasWaitingConsumer method.


If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!