DStream in Spark Streaming

Versions: Spark 2.0.0

In Spark batch-oriented, RDD was a data abstraction. In Spark Streaming RDDs are still present but for the programmer another data type is exposed - DStream.

The full name of Spark Streaming data abstraction is Discretized Stream but the abbreviated version (DStream) is used more often. The first part presents its main characteristics. The second one goes further and shows DStreamGraph representing the workflow of DStream computations. The last part contains some examples showing the use of DStreams.

What is DStream ?

DStream is data abstraction in Spark Streaming. In streaming processing data is accumulated during the time specified in StreamingContext. Data accumulated during this period is then grouped together and transformed into one single RDD. All created and accumulated RDDs are grouped in DStream. Thus, in other words, a DStream is a sequence of RDDs.

DStream can be characterized:

Different types of DStreams can be created. DStream corresponding to incoming stream of data is called input DStream. By the way, each of stream sources has its own implementation of InputDStream, respectively: QueueInputDStream for queue source, FileInputDStream for HDFS-like filesystem or ReceiverInputDStream for receiver. There are also DStreams corresponding to intermediary operations as: filtering (FilteredDStream), flat-mapping (FlatMappedDStream), mapping (MapPartitionedStream) or transforming (TransformedDStream). A DStream for key-value pairs, as well as DStream for shuffle are also defined. The first one can be found in Java's API as JavaPairDStream class. The shuffle's one is represented by ShuffledDStream and it's created on well-known transformations as reduceByKey and combineByKey.

What is DStreamGraph ?

Batch-oriented Spark has a directed acyclic graph (DAG) representing the workflow of RDD computation. Streaming-oriented processing also has its own representation of similar type. This structure is called DStreamGraph.

Its role is mostly related to execution of tasks delegated by job scheduler. Among all, DStreamGraph executes the first computation of DStreams through its start() method. In addition, it holds all input DStreams used to start the computation. Every time when a DStream is processed it produces new DStream called here an output DStream. All produced DStreams are registered inside DStreamGraph by DStreams themselves. They do it through addOutputStream(DStream) method.

Another duty of DStreamGraph is related to jobs. DStreamGraph is used by job scheduler, through job generator, to generate jobs for output DStreams. When this event occurs, following entries can be observed in logs:

DEBUG Callback for JobGenerator called at time 1473962877000 (org.apache.spark.streaming.util.RecurringTimer:58)
DEBUG Got event GenerateJobs(1473962877000 ms) (org.apache.spark.streaming.scheduler.JobGenerator:58)
DEBUG Generating jobs for time 1473962877000 ms (org.apache.spark.streaming.DStreamGraph:58)
DEBUG Time 1473962877000 ms is valid (org.apache.spark.streaming.dstream.FilteredDStream:58)
DEBUG Time 1473962877000 ms is valid (org.apache.spark.streaming.dstream.QueueInputDStream:58)
DEBUG +++ Cleaning closure  (org.apache.spark.streaming.api.java.JavaDStream$$anonfun$filter$1) +++ (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + declared fields: 2 (org.apache.spark.util.ClosureCleaner:58)
DEBUG      public static final long org.apache.spark.streaming.api.java.JavaDStream$$anonfun$filter$1.serialVersionUID (org.apache.spark.util.ClosureClea
ner:58)
DEBUG      private final org.apache.spark.api.java.function.Function org.apache.spark.streaming.api.java.JavaDStream$$anonfun$filter$1.f$1 (org.apache.sp
ark.util.ClosureCleaner:58)
DEBUG  + declared methods: 2 (org.apache.spark.util.ClosureCleaner:58)
DEBUG      public final java.lang.Object org.apache.spark.streaming.api.java.JavaDStream$$anonfun$filter$1.apply(java.lang.Object) (org.apache.spark.util
.ClosureCleaner:58)
DEBUG      public final boolean org.apache.spark.streaming.api.java.JavaDStream$$anonfun$filter$1.apply(java.lang.Object) (org.apache.spark.util.ClosureC
leaner:58)
DEBUG  + inner classes: 0 (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + outer classes: 0 (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + outer objects: 0 (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + populating accessed fields because this is the starting closure (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + fields accessed by starting closure: 0 (org.apache.spark.util.ClosureCleaner:58)
DEBUG  + there are no enclosing objects! (org.apache.spark.util.ClosureCleaner:58)
DEBUG  +++ closure  (org.apache.spark.streaming.api.java.JavaDStream$$anonfun$filter$1) is now cleaned +++ (org.apache.spark.util.ClosureClean
er:58)
DEBUG Generated 2 jobs for time 1473962877000 ms (org.apache.spark.streaming.DStreamGraph:58)
INFO Added jobs for time 1473962877000 ms (org.apache.spark.streaming.scheduler.JobScheduler:54)

Moreover, DStreamGraph executes other tasks triggered by job generator: RDD cleaning and data checkpoint updates.

DStream examples

Below you can find some examples of DStreams, created from different sources, with different output operations applied on them:

@Test
public void should_correctly_create_dstream_for_queue() throws IOException, InterruptedException {
  boolean oneAtTime = false;
  triggerDataCreation(4);

  // * Be careful - only data defined in the queue before stream declaration is read !
  //   It's because under-the-hood Spark crates its own queue and adds
  //   all RDDs already appended
  //   Since both are different objects, adding new RDD to original queue
  //   will not have any impact on the queue created by Spark
  //   See org.apache.spark.streaming.api.java.JavaStreamingContext.queueStream()
  // * When oneAtTime is true, it means that only 1 RDD will be read in each batch.
  //   By defining it at false, all queue's RDDs will be read in single batch interval
  JavaInputDStream<String> numbersStream = streamingContext.queueStream(testQueue, oneAtTime);

  JavaDStream<String> filteredNumbersStream = numbersStream.filter(number -> number.startsWith("Num"));

  // Create new 4 items to see that they won't be consumed
  triggerDataCreation(4);

  List<String> consumedRDDs = new ArrayList<>();
  filteredNumbersStream.foreachRDD(rdd -> rdd.collect()
    .forEach(label -> consumedRDDs.add(label)));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(12_000L);

  // Tested queue should have 8 items but only 4, added before queueStream(...) call, was
  // consumed by Spark
  assertThat(testQueue).hasSize(8);
  // But only 4 items (the initial ones) were consumed
  assertThat(consumedRDDs).hasSize(4);
  assertThat(consumedRDDs).containsOnly(makeTestLabel(0), makeTestLabel(1), makeTestLabel(2), makeTestLabel(3));
}

@Test
public void should_correctly_read_new_files_from_socket_stream() throws InterruptedException, IOException {
  int port = 8081;
  createSocketAndTriggerMessaging(port, 5);

  JavaDStream<String> socketDStream = streamingContext.socketTextStream("localhost", port, StorageLevel.MEMORY_ONLY());

  List<String> receivedData = new ArrayList<>();
  socketDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  assertThat(receivedData).hasSize(5);
  assertThat(receivedData).containsOnly(makeTestLabel(0), makeTestLabel(1), makeTestLabel(2),
    makeTestLabel(3), makeTestLabel(4));
}

@Test
public void should_correctly_register_custom_streaming_listener() throws IOException, InterruptedException {
  boolean oneAtTime = false;
  triggerDataCreation(4);

  CustomStreamingListener listener = new CustomStreamingListener();
  streamingContext.addStreamingListener(listener);
  JavaInputDStream<String> numbersStream = streamingContext.queueStream(testQueue, oneAtTime);
  JavaDStream<String> filteredNumbersStream = numbersStream.filter(number -> number.startsWith("Num"));


  List<String> consumedRDDs = new ArrayList<>();
  filteredNumbersStream.foreachRDD(rdd -> rdd.collect()
    .forEach(label -> consumedRDDs.add(label)));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(12_000L);

  assertThat(consumedRDDs).hasSize(4);
  assertThat(listener.getEvents()).hasSize(20);
  // The expected number of items depends strictly on created RDDs
  // In our case 4 RDDs are consumed, each one triggers 5 events (in order of triggering):
  // 1) Batch is submitted
  // 2) Batch is started
  // 3) Batch is completed
  // 4) Output operation is started
  // 5) Output operation is completed
  long batchSubmittedEvent =
    listener.getEvents().stream().filter(event -> event instanceof StreamingListenerBatchSubmitted).count();
  long batchStartedEvent =
    listener.getEvents().stream().filter(event -> event instanceof StreamingListenerBatchStarted).count();
  long batchCompletedEvents =
    listener.getEvents().stream().filter(event -> event instanceof StreamingListenerBatchCompleted).count();
  long outputOperationStartedEvents =
    listener.getEvents().stream().filter(event -> event instanceof StreamingListenerOutputOperationStarted).count();
  long outputOperationCompletedEvents =
    listener.getEvents().stream().filter(event -> event instanceof StreamingListenerOutputOperationCompleted).count();
  assertThat(batchSubmittedEvent).isEqualTo(4);
  assertThat(batchStartedEvent).isEqualTo(4);
  assertThat(batchCompletedEvents).isEqualTo(4);
  assertThat(outputOperationStartedEvents).isEqualTo(4);
  assertThat(outputOperationCompletedEvents).isEqualTo(4);
}

@Test
public void should_correctly_work_with_receiver_stream() throws InterruptedException {
  JavaReceiverInputDStream<String> receiverInputDStream =
    streamingContext.receiverStream(new AutoDataMakingReceiver(StorageLevel.MEMORY_ONLY(), 3_000L, 2));

  List<JavaRDD<String>> batches = new ArrayList<>();
  List<String> receivedData = new ArrayList<>();
  receiverInputDStream.foreachRDD(rdd -> {
    List<String> collectedData = rdd.collect();
    if (!collectedData.isEmpty()) {
      batches.add(rdd);
      receivedData.addAll(collectedData);
    }
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  // Our receiver stores data in 2 sequences with 3 seconds interval
  // This interval corresponds to the batch interval. So we expect
  // to deal with 2 batches
  assertThat(batches).hasSize(2);
  assertThat(receivedData).hasSize(10);
  assertThat(receivedData).containsOnly(makeTestLabel(0), makeTestLabel(1), makeTestLabel(2),
    makeTestLabel(3), makeTestLabel(4));
}

@Test
public void should_correctly_create_dstream_from_another_dstream() throws IOException, InterruptedException {
  boolean oneAtTime = true;
  triggerDataCreation(4);
  JavaInputDStream<String> numbersStream = streamingContext.queueStream(testQueue, oneAtTime);

  // Here we construct one DStream from another one. In occurrence,
  // constructed DStream (output stream) is built from input stream
  JavaDStream<String> filteredNumbersStream = numbersStream.filter(number -> number.startsWith("Num"));
  JavaPairDStream<String, Integer> pairWithCounterDStream =
    filteredNumbersStream.transformToPair(new StringRDDToPairRDDConverter());

  List<Tuple2<String, Integer>> consumedRDDs = new ArrayList<>();
  pairWithCounterDStream.foreachRDD(rdd -> consumedRDDs.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  // Because 1 RDD is retrieved at time, the counter inside converter
  // will increment from 0 up to 3
  // To see the difference, make oneAtTime = false
  assertThat(consumedRDDs).hasSize(4);
  assertThat(consumedRDDs).containsOnly(new Tuple2<>(makeTestLabel(0), 0), new Tuple2<>(makeTestLabel(1), 1),
    new Tuple2<>(makeTestLabel(2), 2), new Tuple2<>(makeTestLabel(3), 3));
}

private static class AutoDataMakingReceiver extends Receiver<String> {

  private long sleepingTime;
  private int turns;

  public AutoDataMakingReceiver(StorageLevel storageLevel, long sleepingTime, int turns) {
    super(storageLevel);
    this.sleepingTime = sleepingTime;
    this.turns = turns;
  }

  @Override
  public void onStart() {
    for (int turn = 0; turn < turns; turn++) {
      for (int i = 0; i < 5; i++) {
        store(makeTestLabel(i));
      }
      try {
        Thread.sleep(sleepingTime);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
      }
    }
  }

  @Override
  public void onStop() {
      // Do nothing but you should clean data here
  }
}

private static class CustomStreamingListener implements StreamingListener {

  private List<StreamingListenerEvent> events = new ArrayList<>();

  @Override
  public void onReceiverStarted(StreamingListenerReceiverStarted started) {
    events.add(started);
  }

  @Override
  public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
    events.add(receiverStopped);
  }

  @Override
  public void onReceiverError(StreamingListenerReceiverError receiverError) {
    events.add(receiverError);
  }

  @Override
  public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) {
    events.add(batchSubmitted);
  }

  @Override
  public void onBatchStarted(StreamingListenerBatchStarted batchStarted) {
    events.add(batchStarted);
  }

  @Override
  public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
    events.add(batchCompleted);
  }
  @Override
  public void onOutputOperationStarted(StreamingListenerOutputOperationStarted started) {
    events.add(started);
  }

  @Override
  public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted completed) {
    events.add(completed);
  }

  public List<StreamingListenerEvent> getEvents() {
    return events;
  }
}

private static class StringRDDToPairRDDConverter implements Function<JavaRDD<String>, JavaPairRDD<String, Integer>>, Serializable {

  private static final LongAdder ADDER = new LongAdder();

  @Override
  public JavaPairRDD<String, Integer> call(JavaRDD<String> inputRDD) throws Exception {
    int newIndex = ADDER.intValue();
    ADDER.increment();
    return inputRDD.mapToPair(entry -> new Tuple2<>(entry, newIndex));
  }
}

private void createSocketAndTriggerMessaging(int port, int labelsLimit) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(1);
  new Thread(() -> {
    ServerSocket serverSocket = null;
    try {
      serverSocket = new ServerSocket(port);
      Socket socket = serverSocket.accept();

      PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
      for (int i = 0; i < labelsLimit; i++) {
        writer.println(makeTestLabel(i));
        try {
          Thread.sleep(2_000);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          e.printStackTrace();
        }
      }
    } catch (IOException e) {
      Thread.currentThread().interrupt();
      e.printStackTrace();
    } finally {
      try {
        serverSocket.close();
      } catch (IOException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
      }
    }
  }).start();
  latch.await(5, TimeUnit.SECONDS);
}


public void triggerDataCreation(int maxRDDs) throws IOException {
  for (int i = 0; i < maxRDDs; i++) {
    JavaRDD<String> newRDD1 = batchContext.parallelize(Arrays.asList(makeTestLabel(i)));
    testQueue.add(newRDD1);
  }
}

private static String makeTestLabel(int number) {
  return "Number " + number;
}

The first part of this post presented DStreams as a sequence of RDDs created within common interval called batch interval. It also listes main types of DStreams defined in Scala's and Java's APIs. In the second part we saw DStreamGraph, responsible for jobs execution, registering of input and output DStreams, and making some maintenance operations, as old RDDs removal or checkpoint updates. The last part shown some use cases of API.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!