DStream transformations

Versions: Spark 2.0.0

Spark Streaming is not static and allows to convert DStreams to new types. It can be done, exactly as for batch-oriented processing, through transformations.

Transformations in Spark Streaming

Transformations allow to create new DStream from already existent one. They are applied on RDDs held by given DStream.

In Spark Streaming 2 types of transformations exist: basic RDD (similar to the ones we find for RDD manipulation in batch processing) and stateful (window- or state-based ones, potentially applicable over time for subsequent data). Some of stateful transformations (updateStateByKey and reduceByKeyAndWindow) are specific since they need checkpointing enabled.

As in the case of RDDs, there are also 2 different DStream transformations regarding to DStream type (pair or single).

Transformations list

As in the case of transformations on RDD, in this article we also make a list to present available transformations with pseudocode examples:

Examples of Spark Streaming transformations

This section contains tests for transformations applied on single and pair DStreams. The first snippets concern single DStream:

private Queue<JavaRDD<String>> testQueue;
private static final long BATCH_INTERVAL = 1_500L;

@Test
public void should_filter_dstream() throws InterruptedException, IOException {
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = 
    streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<String> filteredDStream = 
    queueDStream.filter(label -> label.equals(makeTestLabel(0)));
  List<String> receivedData = new ArrayList<>();
  filteredDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  assertThat(receivedData).hasSize(1);
  assertThat(receivedData).containsOnly(makeTestLabel(0));
}

@Test
public void should_map_dstream() throws InterruptedException, IOException {
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<Integer> mappedDStream = queueDStream.map(label -> toInt(label));
  List<Integer> receivedData = new ArrayList<>();
  mappedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  assertThat(receivedData).hasSize(4);
  assertThat(receivedData).containsOnly(0, 1, 2, 3);
}

@Test
public void should_flatmap_dstream() throws IOException, InterruptedException {
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<Integer> flatMappedDStream = 
    queueDStream.flatMap(label -> Arrays.asList(toInt(label)).iterator());
  JavaDStream<List<Integer>> mappedDStream = 
    queueDStream.map(label -> Arrays.asList(toInt(label)));

  List<Integer> receivedFlatMappedData = new ArrayList<>();
  flatMappedDStream.foreachRDD(rdd -> receivedFlatMappedData.addAll(rdd.collect()));
  List<List<Integer>> receivedMappedData = new ArrayList<>();
  mappedDStream.foreachRDD(listJavaRDD -> receivedMappedData.addAll(listJavaRDD.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  // As you can observe below, the difference between flat map and map consists
  // on format of returned items. For simple map function, when we deal with a collection of
  // elements (in our case, single item lists), DStream's RDDs will contain
  // a lists of lists with data ( [ [0], [1], [2], [3] ] ).
  // In the case of flat map they will have flattened lists with data ( [0, 1, 2, 3] ).
  assertThat(receivedFlatMappedData).hasSize(4);
  assertThat(receivedFlatMappedData).containsOnly(0, 1, 2, 3);
  assertThat(receivedMappedData).hasSize(4);
  assertThat(receivedMappedData)
    .containsOnly(Arrays.asList(0), Arrays.asList(1), Arrays.asList(2), Arrays.asList(3));
}

@Test
public void should_map_partition() throws IOException, InterruptedException {
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);
  // For test purposes we set a simple common prefix but you'd imagine here
  // a slow database or web service connection retrieving a lot
  // of objects shared by all transformed items
  String prefix = ">>>> ";

  JavaDStream<String> partitionMappedDStream = queueDStream.mapPartitions(labelsIterator -> {
    List<String> prefixedLabels = new ArrayList<>();
    // labelsIterator contains all items within treated RDD's partition
    labelsIterator.forEachRemaining(label -> prefixedLabels.add(prefix + label));
    return prefixedLabels.iterator();
  });
  List<String> receivedData = new ArrayList<>();
  partitionMappedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  assertThat(receivedData).hasSize(4);
  assertThat(receivedData).containsOnly(prefix + makeTestLabel(0), prefix + makeTestLabel(1),
    prefix + makeTestLabel(2), prefix + makeTestLabel(3));
}

@Test
public void should_glom_dstream() throws IOException, InterruptedException {
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  // It coalesces all elements of given partition to a single array.
  // Representing RDD as flattened array helps to avoid shuffling in some situations, as for
  // example the maximum retrieval described in this post:
  // http://blog.madhukaraphatak.com/glom-in-spark/
  JavaDStream<List<String>> glommedDStream = queueDStream.glom();

  List<Integer> partitionSizes = new ArrayList<>();
  List<List<String>> receivedData = new ArrayList<>();
  glommedDStream.foreachRDD(rdd -> {
    partitionSizes.add(rdd.partitions().size()); 
    receivedData.addAll(rdd.collect());
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  // We expect to have RDDs of 4 partitions when stream read data and 1 partition when
  // there were no data to read.
  // In consequence glom operates on below RDDs:
  // * 1st data composed by 4 single item partitions  ["Number 0", "", "", ""]
  // * 2nd data composed by 4 single item partitions  ["Number 1", "", "", ""]
  // * 3rd data composed by 4 single item partitions  ["Number 2", "", "", ""]
  // * 4th data composed by 4 single item partitions  ["Number 3", "", "", ""]
  // * the others composed by a single empty partition [""]
  // After calling RDD.collect(), the result of glom is below: [
  //   [], [], [], [Number 0], [], [], [], [Number 1], [], [], [], [Number 2], [], [], [], [Number 3], [], [], []
  // ] <- common array of all coalesced RDDs
  // If given partition had more than 1 element, they would be put together into common array.
  assertThat(receivedData).hasSize(partitionSizes.stream().reduce((v1, v2) -> v1 + v2).get());
  assertThat(receivedData).containsOnly(
    singletonList(makeTestLabel(0)), singletonList(makeTestLabel(1)),
    singletonList(makeTestLabel(2)), singletonList(makeTestLabel(3)),
    emptyList());
  assertThat(partitionSizes.subList(0, 3)).containsOnly(4);
  assertThat(partitionSizes.subList(4, partitionSizes.size())).containsOnly(1);
}

@Test
public void should_repartition_dstream() throws IOException, InterruptedException {
  triggerDataCreation(4);
  boolean notOneAtTime = false;
  // Start by checking the number of initial partitions
  JavaDStream<String> queueDStream = streamingContext.queueStream(testQueue, notOneAtTime);
  Set<Integer> initialPartitionSizes = new HashSet<>();
  queueDStream.foreachRDD(rdd -> {
    if (!rdd.collect().isEmpty()) {
      initialPartitionSizes.add(rdd.partitions().size());
    }
  });
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(5_000L);
  streamingContext.stop();

  // Restart context and check partitions after repartition
  JavaStreamingContext localStreamingContext =
    new JavaStreamingContext(CONFIGURATION, Durations.milliseconds(BATCH_INTERVAL));
  queueDStream = localStreamingContext.queueStream(testQueue, notOneAtTime);
  JavaDStream<String> repartitionedDStream = queueDStream.repartition(2);
  Set<Integer> repartitionedSizes = new HashSet<>();
  repartitionedDStream.foreachRDD(rdd -> {
    repartitionedSizes.add(rdd.partitions().size());
  });
  localStreamingContext.start();
  localStreamingContext.awaitTerminationOrTimeout(5_000L);

  assertThat(repartitionedSizes).containsOnly(2);
  assertThat(initialPartitionSizes).doesNotContain(2);
}

@Test
public void should_reduce_dstream() throws IOException, InterruptedException {
  triggerDataCreation(4);
  // Reduce returns a new DStream. It's composed by RDDs constructed by applying
  // reduce operations on them. So to see reduce() works, we need to ingest more than 1 item
  // at time to our queue stream.
  JavaInputDStream<String> queueDStream = 
    streamingContext.queueStream(testQueue, !ONE_AT_TIME);

  JavaDStream<String> reducedDStream = queueDStream.reduce(new LabelsConcatenator());

  List<String> receivedData = new ArrayList<>();
  reducedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  assertThat(receivedData).hasSize(1);
  assertThat(receivedData.get(0))
    .isEqualTo("Labels are: Number 0, Number 1, Number 2, Number 3");
}

@Test
public void should_count_rdds_in_dstream() throws InterruptedException, IOException {
  triggerDataCreation(5);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<Long> countDStream = queueDStream.map(label -> toInt(label))
    .filter(number -> number > 2)
    .count();

  List<Long> receivedData = new ArrayList<>();
  countDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  assertThat(receivedData).hasSize(5);
  // count() is applied per RDD. Since queueStream gets 1 RDD at time,
  // it's normal to have 5 counters
  assertThat(receivedData).containsOnly(0L, 0L, 0L, 1L, 1L);
}

@Test
public void should_transform_dstream() throws IOException, InterruptedException {
  triggerDataCreation(5);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  // transform() provides more fine-grained control on
  // transformations applied on each RDD held by
  // given DStream
  JavaDStream<Integer> transformedDStream = queueDStream.transform(stringRDD -> stringRDD
    .filter(text -> !text.isEmpty())
    .map(label -> toInt(label)));

  List<Integer> receivedData = new ArrayList<>();
  transformedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  assertThat(receivedData).hasSize(5);
  assertThat(receivedData).containsOnly(0, 1, 2, 3, 4);
}

@Test
public void should_transform_dstream_to_pair() throws IOException, InterruptedException {
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaPairDStream<String, Integer> pairDStream = queueDStream.transformToPair(labelRDD ->
    labelRDD.mapToPair(label -> new Tuple2<>(label, toInt(label))));

  List<Tuple2<String, Integer>> receivedData = new ArrayList<>();
  pairDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  assertThat(receivedData).hasSize(4);
  assertThat(receivedData).contains(new Tuple2<>(makeTestLabel(0), 0), new Tuple2<>(makeTestLabel(1), 1),
    new Tuple2<>(makeTestLabel(2), 2), new Tuple2<>(makeTestLabel(3), 3));
}

@Test
public void should_join_two_dstreams_with_union_transformation() throws IOException, InterruptedException {
  triggerDataCreation(5);
  boolean oneAtTime = false;
  JavaInputDStream<String> queueDStreamFirst5 = 
    streamingContext.queueStream(testQueue, oneAtTime);
  testQueue.clear();
  triggerDataCreation(5, 15);
  JavaInputDStream<String> queueDStreamFirst10 = 
    streamingContext.queueStream(testQueue, oneAtTime);

  JavaDStream<String> jointDStream = queueDStreamFirst5.union(queueDStreamFirst10);

  List<String> receivedData = new ArrayList<>();
  jointDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(12_000L);

  assertThat(receivedData).hasSize(15);
  assertThat(receivedData).containsOnly(makeTestLabel(0), makeTestLabel(1), makeTestLabel(2), makeTestLabel(3),
    makeTestLabel(4), makeTestLabel(5), makeTestLabel(6), makeTestLabel(7), makeTestLabel(8),
    makeTestLabel(9), makeTestLabel(10), makeTestLabel(11), makeTestLabel(12), makeTestLabel(13),
    makeTestLabel(14));
}

@Test
public void should_get_sliced_data_from_dstream() throws IOException, InterruptedException {
  // Purely speaking, slice is not a transformation but it deserves
  // its place in this examples list because it's widely used
  // by other transformations, as window-based ones
  triggerDataCreation(5);
  long startTime = System.currentTimeMillis();
  JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  queueDStream.filter(t -> true).foreachRDD(rdd -> {});
  streamingContext.start();


  // The first received RDD should be returned here
  List<JavaRDD<String>> slicedDStream =
    queueDStream.slice(new Time(startTime), new Time(startTime+BATCH_INTERVAL));
  assertThat(slicedDStream).hasSize(1);
  assertThat(slicedDStream.get(0).collect()).containsOnly(makeTestLabel(0));
  // Here 2 first RDDs should be returned
  slicedDStream =
    queueDStream.slice(new Time(startTime), new Time(startTime+BATCH_INTERVAL*2));
  assertThat(slicedDStream).hasSize(2);
  assertThat(slicedDStream.get(0).collect()).containsOnly(makeTestLabel(0));
  assertThat(slicedDStream.get(1).collect()).containsOnly(makeTestLabel(1));

  streamingContext.awaitTerminationOrTimeout(8_000L);
}

@Test
public void should_apply_count_by_value_when_all_rdds_are_in_dstream() throws IOException, InterruptedException {
  boolean notOneAtTime = false;
  triggerDataCreation(4);
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = 
    streamingContext.queueStream(testQueue, notOneAtTime);

  JavaPairDStream<String, Long> counterByValueDStream = queueDStream.countByValue();

  List<Tuple2<String, Long>> receivedData = new ArrayList<>();
  counterByValueDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(12_000L);

  assertThat(receivedData).hasSize(4);
  assertThat(receivedData).containsOnly(new Tuple2<>(makeTestLabel(0), 2L),
    new Tuple2<>(makeTestLabel(1), 2L), new Tuple2<>(makeTestLabel(2), 2L), 
    new Tuple2<>(makeTestLabel(3), 2L));
}

@Test
public void should_apply_count_by_value_when_there_are_1_rdd_per_dstream() throws IOException, InterruptedException {
  triggerDataCreation(4);
  triggerDataCreation(4);
  JavaInputDStream<String> queueDStream = 
    streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaPairDStream<String, Long> counterByValueDStream = queueDStream.countByValue();

  List<Tuple2<String, Long>> receivedData = new ArrayList<>();
  counterByValueDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(12_000L);

  // countByValue(...) applies individually on RDDs contained in given DStream. In our case
  // we include 1 RDD each time. So, the RDD#1 has this format: ["Number 0"], RDD#2 this one: ["Number 1"],
  // RDD#3 this one: ["Number 2"] and so on.
  // It's the reason why, unlike in should_apply_count_by_value_when_all_RDDs_are_in_DStream, here
  // we receive a list of 8 elements.
  assertThat(receivedData).hasSize(8);
  assertThat(receivedData).containsOnly(new Tuple2<>(makeTestLabel(0), 1L),
    new Tuple2<>(makeTestLabel(1), 1L), 
    new Tuple2<>(makeTestLabel(2), 1L), 
    new Tuple2<>(makeTestLabel(3), 1L)); 
}

private static class LabelsConcatenator implements Function2<String, String, String>, Serializable {

  private static final String PREFIX = "Labels are: ";

  @Override
  public String call(String baseLabel, String labelToConcatenate) throws Exception {
    String prefix = baseLabel.contains(PREFIX) ? "" : PREFIX;
    return  prefix + baseLabel + ", " + labelToConcatenate;
  }
}

private void triggerDataCreation(int start, int maxRDDs) throws IOException {
  for (int i = start; i < maxRDDs; i++) {
    JavaRDD<String> newRDD1 = batchContext.parallelize(Arrays.asList(makeTestLabel(i)));
    testQueue.add(newRDD1);
  }
}

private void triggerDataCreation(int maxRDDs) throws IOException {
  triggerDataCreation(0, maxRDDs);
}

private static String makeTestLabel(int number) {
  return "Number " + number;
}

private static Integer toInt(String label) {
  return Integer.valueOf(label.replace("Number ", "").trim());
}

And below the tests for pair DStream transformations:

private static final boolean ONE_AT_TIME = true;
private static final long BATCH_INTERVAL = 1_500L;
private Queue<JavaRDD<YellowPagesEntry>> testQueue;


@Test
public void should_cogroup_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 2, 10, 11);
  triggerDataCreation(0, 2, 12, 13);

  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  testQueue.clear();
  triggerDataCreation(0, 3, 20, 21, 22);
  JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  // As you will see, cogroup groups data sharing the same key from
  // both DStreams
  JavaPairDStream<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupedPairDStream =
    pairEntriesDStream.cogroup(pairEntriesDStream2);

  Map<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> receivedData = new HashMap<>();
  cogroupedPairDStream.foreachRDD(rdd -> {
    List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> collectedData = rdd.collect();
    if (!collectedData.isEmpty()) {
      collectedData.forEach(entry -> receivedData.put(entry._1(), entry._2()));
    }
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  assertThat(receivedData).hasSize(3);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")._1()).containsOnly(10, 12);
  assertThat(receivedData.get("Person 0")._2()).containsOnly(20);
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")._1()).containsOnly(11, 13);
  assertThat(receivedData.get("Person 1")._2()).containsOnly(21);
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")._1()).isEmpty();
  assertThat(receivedData.get("Person 2")._2()).containsOnly(22);
}

@Test
public void should_union_pair_dstream() throws InterruptedException, IOException {
  triggerDataCreation(0, 5, 10, 11, 12, 13, 14);

  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  testQueue.clear();
  triggerDataCreation(0, 10, 210, 211, 212, 213, 214, 15, 16, 17, 18, 19);
  JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  JavaPairDStream<String, Integer> unionPairDStream = pairEntriesDStream.union(pairEntriesDStream2);

  List<List<Tuple2<String, Integer>>> dataPerRDD = new ArrayList<>();
  unionPairDStream.foreachRDD(rdd -> dataPerRDD.add(rdd.collect()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  assertThat(dataPerRDD).hasSize(10);
  assertThat(dataPerRDD.get(0)).hasSize(2);
  assertThat(dataPerRDD.get(0))
    .containsOnly(new Tuple2<>("Person 0", 10), new Tuple2<>("Person 0", 210));
  assertThat(dataPerRDD.get(1)).hasSize(2);
  assertThat(dataPerRDD.get(1))
    .containsOnly(new Tuple2<>("Person 1", 11), new Tuple2<>("Person 1", 211));
  assertThat(dataPerRDD.get(2)).hasSize(2);
  assertThat(dataPerRDD.get(2))
    .containsOnly(new Tuple2<>("Person 2", 12), new Tuple2<>("Person 2", 212));
  assertThat(dataPerRDD.get(3)).hasSize(2);
  assertThat(dataPerRDD.get(3))
    .containsOnly(new Tuple2<>("Person 3", 13), new Tuple2<>("Person 3", 213));
  assertThat(dataPerRDD.get(4)).hasSize(2);
  assertThat(dataPerRDD.get(4))
    .containsOnly(new Tuple2<>("Person 4", 14), new Tuple2<>("Person 4", 214));
  assertThat(dataPerRDD.get(5)).hasSize(1);
  assertThat(dataPerRDD.get(5)).containsOnly(new Tuple2<>("Person 5", 15));
  assertThat(dataPerRDD.get(6)).hasSize(1);
  assertThat(dataPerRDD.get(6)).containsOnly(new Tuple2<>("Person 6", 16));
  assertThat(dataPerRDD.get(7)).hasSize(1);
  assertThat(dataPerRDD.get(7)).containsOnly(new Tuple2<>("Person 7", 17));
  assertThat(dataPerRDD.get(8)).hasSize(1);
  assertThat(dataPerRDD.get(8)).containsOnly(new Tuple2<>("Person 8", 18));
  assertThat(dataPerRDD.get(9)).hasSize(1);
  assertThat(dataPerRDD.get(9)).containsOnly(new Tuple2<>("Person 9", 19));
}

@Test
public void should_map_values_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 5, 10, 11, 12, 13, 14);
  triggerDataCreation(0, 3, 20, 21, 22);

  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  JavaPairDStream<String, String> mappedPairDStream =
    pairEntriesDStream.mapValues(telNumber -> makePersonIntro(telNumber));

  List<Tuple2<String, String>> receivedData = new ArrayList<>();
  mappedPairDStream.foreachRDD(rdd -> {
    List<Tuple2<String, String>> collectedData = rdd.collect();
    if (!collectedData.isEmpty()) {
      receivedData.addAll(collectedData);
    }
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  assertThat(receivedData).hasSize(8);
  assertThat(receivedData.get(0)).isEqualTo(new Tuple2<>("Person 0", makePersonIntro(10)));
  assertThat(receivedData.get(1)).isEqualTo(new Tuple2<>("Person 1", makePersonIntro(11)));
  assertThat(receivedData.get(2)).isEqualTo(new Tuple2<>("Person 2", makePersonIntro(12)));
  assertThat(receivedData.get(3)).isEqualTo(new Tuple2<>("Person 3", makePersonIntro(13)));
  assertThat(receivedData.get(4)).isEqualTo(new Tuple2<>("Person 4", makePersonIntro(14)));
  assertThat(receivedData.get(5)).isEqualTo(new Tuple2<>("Person 0", makePersonIntro(20)));
  assertThat(receivedData.get(6)).isEqualTo(new Tuple2<>("Person 1", makePersonIntro(21)));
  assertThat(receivedData.get(7)).isEqualTo(new Tuple2<>("Person 2", makePersonIntro(22)));
}

@Test
public void should_flat_map_values_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 2, 10, 11);

  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  JavaPairDStream<String, String> flatMappedPairDStream =
    pairEntriesDStream.flatMapValues(telNumber -> singletonList(makePersonIntro(telNumber)));
  JavaPairDStream<String, Iterable<String>> mappedPairDStream =
    pairEntriesDStream.mapValues(telNumber -> singletonList(makePersonIntro(telNumber)));

  List<Tuple2<String, String>> receivedDataFromFlatMap = new ArrayList<>();
  flatMappedPairDStream.foreachRDD(rdd -> {
    List<Tuple2<String, String>> collectedData = rdd.collect();
    if (!collectedData.isEmpty()) {
      receivedDataFromFlatMap.addAll(collectedData);
    }
  });
  List<Tuple2<String, Iterable<String>>> receivedDataFromMap = new ArrayList<>();
  mappedPairDStream.foreachRDD(rdd -> {
    List<Tuple2<String, Iterable<String>>> collectedData = rdd.collect();
    if (!collectedData.isEmpty()) {
      receivedDataFromMap.addAll(collectedData);
    }
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(5_000L);

  // The difference between map and flat map is, once again, the fact of
  // flattening entries in the second. For the same function, map
  // returns a collection of data.
  assertThat(receivedDataFromFlatMap).hasSize(2);
  assertThat(receivedDataFromFlatMap.get(0)).isEqualTo(new Tuple2<>("Person 0", makePersonIntro(10)));
  assertThat(receivedDataFromFlatMap.get(1)).isEqualTo(new Tuple2<>("Person 1", makePersonIntro(11)));
  assertThat(receivedDataFromMap).hasSize(2);
  assertThat(receivedDataFromMap.get(0)).isEqualTo(new Tuple2<>("Person 0", singletonList(makePersonIntro(10))));
  assertThat(receivedDataFromMap.get(1)).isEqualTo(new Tuple2<>("Person 1", singletonList(makePersonIntro(11))));
}

@Test
public void should_combine_by_key_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 3, 10, 11, 12);
  triggerDataCreation(0, 3, 20, 21, 22);
  triggerDataCreation(0, 2, 30, 31);

  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber));

  // DStream's combineByKey(...) is similar to RDD's combineByKey(...). First,
  // we create initialization function. After we create a function working on initialized
  // element with subsequent treated items. Finally we merge combined results.
  // The last parameter is partitioner to use.
  JavaPairDStream<String, List<Integer>> combinedPairDStream = pairEntriesDStream.combineByKey(
    Lists::newArrayList,
    (telNumbersList, telNumber) -> {
      telNumbersList.add(telNumber);
      return telNumbersList;
    },
    (telNumbersList1, telNumbersList2) -> {
      telNumbersList1.addAll(telNumbersList2);
      return telNumbersList1;
    },
    new SinglePartitionPartitioner()
  );
  Map<String, List<Integer>> receivedData = new HashMap<>();
  combinedPairDStream.foreachRDD(rdd -> {
    rdd.collectAsMap().forEach((person, telNumbers) -> receivedData.put(person, telNumbers));
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  assertThat(receivedData).hasSize(3);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")).hasSize(3).containsOnly(10, 20, 30);
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")).hasSize(3).containsOnly(11, 21, 31);
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")).hasSize(2).containsOnly(12, 22);
}

@Test
public void should_reduce_by_key_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 5, 10, 11, 12, 13, 14);
  triggerDataCreation(0, 3, 20, 21, 22);

  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));

  JavaPairDStream<String, Integer> groupedByKeyPairDStream = pairEntriesDStream
    .reduceByKey((v1, v2) -> v1+v2);
  Map<String, Integer> receivedData = new HashMap<>();
  groupedByKeyPairDStream.foreachRDD(rdd -> {
    Map<String, Integer> collectedData = rdd.collectAsMap();
    if (!collectedData.isEmpty()) {
      collectedData.forEach((person, sum) -> receivedData.put(person, sum));
    }
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(5_000L);

  assertThat(receivedData).hasSize(5);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")).isEqualTo(30);
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")).isEqualTo(32);
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")).isEqualTo(34);
  assertThat(receivedData.get("Person 3")).isNotNull();
  assertThat(receivedData.get("Person 3")).isEqualTo(13);
  assertThat(receivedData.get("Person 4")).isNotNull();
  assertThat(receivedData.get("Person 4")).isEqualTo(14);
}

@Test
public void should_group_by_key_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 5, 10, 11, 12, 13, 14);
  triggerDataCreation(0, 3, 20, 21, 22);

  // If 'one at time' is false, received data will be 1 per RDD,
  // so the grouping won't be done.
  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  JavaPairDStream<String, Iterable<Integer>> groupedByKeyPairDStream = pairEntriesDStream.groupByKey();
  List<Tuple2<String, Iterable<Integer>>> receivedData = new ArrayList<>();
  groupedByKeyPairDStream.foreachRDD(rdd -> {
    List<Tuple2<String, Iterable<Integer>>> collectedData = rdd.collect();
    if (!collectedData.isEmpty()) {
      receivedData.addAll(collectedData);
    }
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  assertThat(receivedData).hasSize(5);
  Tuple2<String, Iterable<Integer>> person0 = findPerson(receivedData, "Person 0");
  assertThat(person0._2()).containsOnly(10, 20);
  Tuple2<String, Iterable<Integer>> person1 = findPerson(receivedData, "Person 1");
  assertThat(person1._2()).containsOnly(11, 21);
  Tuple2<String, Iterable<Integer>> person2 = findPerson(receivedData, "Person 2");
  assertThat(person2._2()).containsOnly(12, 22);
  Tuple2<String, Iterable<Integer>> person3 = findPerson(receivedData, "Person 3");
  assertThat(person3._2()).containsOnly(13);
  Tuple2<String, Iterable<Integer>> person4 = findPerson(receivedData, "Person 4");
  assertThat(person4._2()).containsOnly(14);
}

@Test
public void should_join_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 5, 10, 11, 12, 13, 14);
  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
          .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  testQueue.clear();

  triggerDataCreation(0, 3, 20, 21, 22);
  JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  JavaPairDStream<String, Tuple2<Integer, Integer>> joinedPairDStream = pairEntriesDStream.join(pairEntriesDStream2);
  Map<String, Tuple2<Integer, Integer>> receivedData = new HashMap<>();
  joinedPairDStream.foreachRDD(rdd -> {
    rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers));
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(15_000L);

  // Only 3 values are expected because join is strict - if there are no common
  // values in both DStream's RDDs, they won't be merged
  assertThat(receivedData).hasSize(3);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(10, 20));
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(11, 21));
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(12, 22));
}

@Test
public void should_left_outer_join_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 3, 10, 11, 12);
  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  testQueue.clear();

  triggerDataCreation(0, 2, 20, 21);
  JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  JavaPairDStream<String, Tuple2<Integer, Optional<Integer>>> joinedPairDStream =
    pairEntriesDStream.leftOuterJoin(pairEntriesDStream2);
  Map<String, Tuple2<Integer, Optional<Integer>>> receivedData = new HashMap<>();
  joinedPairDStream.foreachRDD(rdd -> {
    rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers));
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  // Left outer join includes all values from the left pair DStream
  // and only the matching ones from the right DStream
  assertThat(receivedData).hasSize(3);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(10, Optional.of(20)));
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(11, Optional.of(21)));
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(12, Optional.absent()));
}

@Test
public void should_right_outer_join_for_pair_dstream() throws InterruptedException, IOException {
  triggerDataCreation(0, 2, 10, 11);
  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  testQueue.clear();

  triggerDataCreation(0, 3, 20, 21, 22);
  JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  JavaPairDStream<String, Tuple2<Optional<Integer>, Integer>> joinedPairDStream =
    pairEntriesDStream.rightOuterJoin(pairEntriesDStream2);
  Map<String, Tuple2<Optional<Integer>, Integer>> receivedData = new HashMap<>();
  joinedPairDStream.foreachRDD(rdd -> {
    rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers));
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  // Right outer join works inversely to the left outer join - it includes all values from
  // the right pair DStream and only the matching ones from the right DStream
  assertThat(receivedData).hasSize(3);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(Optional.of(10), 20));
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(Optional.of(11), 21));
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(Optional.absent(), 22));
}

@Test
public void should_full_outer_join_for_pair_dstream() throws IOException, InterruptedException {
  triggerDataCreation(0, 2, 10, 11);
  JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  testQueue.clear();

  triggerDataCreation(2, 4, 0, 0, 20, 21);
  JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)));
  JavaPairDStream<String, Tuple2<Optional<Integer>, Optional<Integer>>> joinedPairDStream =
    pairEntriesDStream.fullOuterJoin(pairEntriesDStream2);
  Map<String, Tuple2<Optional<Integer>, Optional<Integer>>> receivedData = new HashMap<>();
  joinedPairDStream.foreachRDD(rdd -> {
    rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers));
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  // Full outer join takes all data from both DStreams
  assertThat(receivedData).hasSize(4);
  assertThat(receivedData.get("Person 0")).isNotNull();
  assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(Optional.of(10), Optional.absent()));
  assertThat(receivedData.get("Person 1")).isNotNull();
  assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(Optional.of(11), Optional.absent()));
  assertThat(receivedData.get("Person 2")).isNotNull();
  assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(Optional.absent(), Optional.of(20)));
  assertThat(receivedData.get("Person 3")).isNotNull();
  assertThat(receivedData.get("Person 3")).isEqualTo(new Tuple2<>(Optional.absent(), Optional.of(21)));
}

private static String makePersonIntro(int telNumber) {
  int modulo = telNumber%10;
  return "This is person "+modulo + " with tel number "+telNumber;
}

private static <T> Tuple2<String, T> findPerson(List<Tuple2<String, T>> receivedData, String personLabel) {
  return receivedData.stream().filter(tuple -> tuple._1().equals(personLabel)).findAny().get();
}

private void triggerDataCreation(int startIndex, int maxRDDs, int...telNumbers) throws IOException {
  for (int i = startIndex; i < maxRDDs; i++) {
    JavaRDD<YellowPagesEntry> newRDD1 =
      batchContext.parallelize(Arrays.asList(new YellowPagesEntry("Person " + i, telNumbers[i])));
    testQueue.add(newRDD1);
  }
}

private static class YellowPagesEntry implements Serializable {
  private String person;

  private int telNumber;

  public YellowPagesEntry(String person, int telNumber) {
    this.person = person;
    this.telNumber = telNumber;
  }
}

private static class SinglePartitionPartitioner extends Partitioner implements Serializable {

  @Override
  public int numPartitions() {
    return 1;
  }

  @Override
  public int getPartition(Object key) {   System.out.println("Partition for key " + key);
    return 0;
  }
}

This post introduces available transformations in Spark Streaming. Its first part explains what these transformations are in the context of streaming processing. The second part lists some of major transformations and shows how they work in pseudocode snippets. The last part gives less abstract idea of transformations use because it presents their use through unit tests.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!