Receivers in Spark Streaming

Versions: Spark 2.0.0

Standard data sources, such as files, queues or sockets are natively implement in Spark Streaming context. But the framework allows the creation of more flexible data consumers called receivers.

The first part of this post presents receivers. The second part shows how to implement receiver for custom data source.

What are receivers ?

Receivers are special objects in Spark Streaming which goal is to consume data from data sources and move it to Spark. Receivers are created by streaming context as long running tasks on different executors. They're scheduled on round robin fashion. Each receiver takes 1 core.

But how does it work ? Receivers create data blocks during a duration called block interval. This duration, configured through spark.streaming.blockInterval, defines how often data received by receivers is chunked to blocks. These blocks are stored in executor's memory and, sometimes, also in Write Ahead Logs to improve fault-tolerance. After, the executor notifies the driver about stored data. This notification leads to trigger data transformations every batch interval. The result of these transformations are RDDs which partitions are constituted by blocks generated by receivers.

Receiver's API

Among technical considerations, receivers can be built by extending abstract class Receiver. Two methods must be implemented: onStart() and onStop(), respectively used to, start and stop data consumption. The first method can contain all necessary initialization for data reading, such as opening connections, creating new threads etc.

But be careful, onStart() must be non-blocking. Thus to be able to properly start and stop receiver, data retrieval should be implement in new thread. Otherwise, even if streaming context exceeds its timeout, it won't be stopped because of blocking onStart() method.

Data is retrieved in new thread created and started by onStart() method but how it's sent to Spark ? Actually there are 2 methods to move data from receiver to Spark context, both used in two different types of receivers. Before talking about these methods, let's explain what are receiver types:

Under-the-hood, the difference between them comes from the method used to send data to Spark. For the case of reliable receiver, data is sent through store(...) method taking in parameter collection-like objects (Iterator, ByteBuffer or ArrayBuffer). It's a blocking method which doesn't return as long as Spark doesn't notify receiver about successful data save. After returning, receiver can acknowledge source about data reception.

In the other side, unreliable receiver uses store(...) method taking in parameter a single object. This method is not blocking but it doesn't sent data immediately to Spark. Instead of that, it keeps data in memory and sends it as batch to Spark after accumulating some number of items.

Sometimes receiver can fail. In this case, receiver can be restarted through restart(...) method. However, the restart is not immediate, it's only scheduled. The restart execution consists on calling onStop() and onStart() methods subsequently withint configured delay (spark.streaming.receiverRestartDelay property).

Example of custom receiver

Below you can find 3 examples of receivers: blocking (quite difficult to test it with assertions, it's the reason why it shows the blocking method through System's out), reliable and unreliable receivers:

public static Queue<Integer> testQueue; 

@Test
public void should_show_that_onstart_is_blocking_method() throws IOException, InterruptedException {
  triggerDataCreation(0, 2);

  BlockingReceiver blockingReceiver = new BlockingReceiver(DEFAULT_STORAGE);
  JavaReceiverInputDStream<Integer> unreliableReceiverDStream =
    streamingContext.receiverStream(blockingReceiver);
  unreliableReceiverDStream.foreachRDD(rdd -> {});

  long startTime = System.currentTimeMillis();
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(TERMINATION_TIME);

  long runningTime = (System.currentTimeMillis() - startTime)/1000;
  System.out.println("Streaming context was awaiting during " + runningTime + " seconds");

  // This test doesn't have any assertion. Instead it result can be observed
  // through System's prints, which should be:
  // Streaming context was awaiting during 2 seconds
  // Receiver was stopped after 4 seconds
}

@Test
public void should_create_realiable_receiver() throws IOException, InterruptedException {
  triggerDataCreation(0, 15);

  List<List<Integer>> collectedData = new ArrayList<>();
  JavaReceiverInputDStream<Integer> reliableReceiverDStream =
    streamingContext.receiverStream(new ReliableReceiver(DEFAULT_STORAGE));
  reliableReceiverDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), collectedData));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(TERMINATION_TIME);

  assertThat(collectedData).hasSize(3);
  assertThat(collectedData.get(0)).containsOnly(0, 1, 2, 3, 4);
  assertThat(collectedData.get(1)).containsOnly(5, 6, 7, 8, 9);
  assertThat(collectedData.get(2)).containsOnly(10, 11, 12, 13, 14);
}

@Test
public void should_create_unrealiable_receiver() throws IOException, InterruptedException {
  triggerDataCreation(0, 15);

  List<List<Integer>> collectedData = new ArrayList<>();
  JavaReceiverInputDStream<Integer> unreliableReceiverDStream =
    streamingContext.receiverStream(new UnreliableReceiver(DEFAULT_STORAGE));
  unreliableReceiverDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), collectedData));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(TERMINATION_TIME);

  assertThat(collectedData).hasSize(6);
  assertThat(collectedData.get(0)).containsOnly(0, 1, 2);
  assertThat(collectedData.get(1)).containsOnly(3, 4);
  assertThat(collectedData.get(2)).containsOnly(5, 6, 7);
  assertThat(collectedData.get(3)).containsOnly(8, 9);
  assertThat(collectedData.get(4)).containsOnly(10, 11, 12);
  assertThat(collectedData.get(5)).containsOnly(13, 14);
}

private static class BlockingReceiver extends Receiver<Integer> {

  private long startTime;
  private long startExecutionEndTime;

  public BlockingReceiver(StorageLevel storageLevel) {
    super(storageLevel);
    startTime = System.currentTimeMillis();
    startExecutionEndTime = TERMINATION_TIME*2 + startTime;
  }

    @Override
    public void onStart() {
      while (System.currentTimeMillis() < startExecutionEndTime) {
        // Data receiving should be started in new thread.
        // Otherwise it's a blocking operation and it doesn't
        // stop at the same time as streaming context
      } 
    }

    @Override
    public void onStop() {
      long runningTime = (System.currentTimeMillis() - startTime)/1000;
      System.out.println("Receiver was stopped after " + runningTime + " seconds");
    }

}

private static class ReliableReceiver extends Receiver<Integer> {

  private static final int BLOCKS_SIZE = 5;

  List<Integer> blocks = new ArrayList<>();

  private Thread receiver;

  public ReliableReceiver(StorageLevel storageLevel) {
      super(storageLevel);
  }

  @Override
  public void onStart() {
    receiver = new Thread(() -> {
      while (!isStopped()) {
        if (blocks.size() == BLOCKS_SIZE) {
          store(blocks.iterator());
          blocks.clear();
        }

        try {
          Thread.sleep(50);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }

        Integer queueHeadItem = testQueue.poll();
        if (queueHeadItem != null) {
          blocks.add(queueHeadItem);
        }
      }
    });
    receiver.start();
  }

  @Override
  public void onStop() {
    receiver.interrupt();
  }
}

private static class UnreliableReceiver extends Receiver<Integer> {
  private Thread receiver;

  public UnreliableReceiver(StorageLevel storageLevel) {
      super(storageLevel);
  }

  @Override
  public void onStart() {
    receiver = new Thread(() -> {
      while (!isStopped()) {
        Integer queueHeadItem = testQueue.poll();
        if (queueHeadItem != null) {
          store(queueHeadItem);
        }
        try {
          Thread.sleep(80);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
      }
    });
    receiver.start();
  }

  @Override
  public void onStop() {
    receiver.interrupt();
  }
}


public static  void triggerDataCreation(int start, int maxRDDs) throws IOException {
  for (int i = start; i < maxRDDs; i++) {
    testQueue.add(i);
  }
}

This post is about receivers in Spark Streaming. The first part presented the role of receivers and their localisation in streaming flow. The second part explained how to implement custom receiver, by explaining also different storage methods. The last part contained an example of 3 types of receivers: blocking, reliable and unreliable.

If you liked it, you should read: