Spark Streaming is not static and allows to convert DStreams to new types. It can be done, exactly as for batch-oriented processing, through transformations.
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Transformations in Spark Streaming
Transformations allow to create new DStream from already existent one. They are applied on RDDs held by given DStream.
In Spark Streaming 2 types of transformations exist: basic RDD (similar to the ones we find for RDD manipulation in batch processing) and stateful (window- or state-based ones, potentially applicable over time for subsequent data). Some of stateful transformations (updateStateByKey and reduceByKeyAndWindow) are specific since they need checkpointing enabled.
As in the case of RDDs, there are also 2 different DStream transformations regarding to DStream type (pair or single).
Transformations list
As in the case of transformations on RDD, in this article we also make a list to present available transformations with pseudocode examples:
- map - transforms RDD of one type to another.
# write first letter in upper case # Note ([] in all cases represents RDD) map(["one_RDD1", "two_RDD1"], ["three_RDD2"]) -> ["One_RDD1", "Two_RDD1"], ["Three_RDD2"]
- flatten map - similar to map but the output is flattenend, ie. one collection is returned instead of multiple collections:
# Translate numbers to words ([1], [2], [3], [4]).flatMap() -> ("one", "two", "three", "four") # And map() to see the difference ([1], [2], [3], [4]).map() -> (["one"], ["two"], ["three"], ["four"])
- filtering - returns an RDD composed with elements satisfying given predicate.
# Goal: keep only words with 2 letters filter(["A", "aa"], ["bbb", "CC"]) -> ["aa"], ["CC"]
- glom - the result of it are RDDs composed by coalescing of all elements within each partition into an array.
(partition1:(1, 2, 3, 4), partition2:(5, 6, 7)).glom() -> ([1, 2, 3, 4], [5, 6, 7])
- reduce - generates new DStream which has single-item RDDs constructed by defined reduce function:
# Goal : show all letters ( ["A", "B", "C"], ["D", "E"]).reduce(v1, v2 -> v1 + ", " + v2) -> (["A, B, C, D, E"])
- transform - allows to define operations directly on held RDDs.
# Goal: keep only players scored at least 10 goals and return # their ids ( [ ("Player1", 1, 32), ("Player2", 2, 5), ("Player3", 3, 0) ]) .tranform( _.filter(goals > 10).map(player -> player.getId()) -> (1)
- union - joins RDDs of two DStreams having the same slide interval into a single DStream
# Goal: group all letters together (["A", "B"]).union(["Z"]) -> (["A", "B", "Z"])
- join - joins strictly (given key must be present in both DStreams) 2 Dstreams:
# Goal: get students registers last year and this year ([ ("Student1": "IT"), ("Student2": "Biology") ]).join("[ ("Student1": "IT") ] -> ( ("Student1": ("IT", "IT")) )
- left outer join - gets all elements from left DStream and sets matching elements from the right DStream.
# Goal: get students registers last year who continued studies after ([ ("Student1": "IT"), ("Student2": "Biology") ]).leftOuterJoin("[ ("Student1": "IT") ] -> ( ("Student1": ("IT", "IT")) )
- right outer join - gets all elements from right DStream and sets matching elements from the left DStream.
# Goal: get all new students (= who weren't registered last year) ([ ("Student1": "IT"), ("Student2": "Biology") ]).rightOuterJoin("[ ("Student1": "IT") ] -> ( ("Student2": ("", "Biology")) )
- full outer join - similar to union, merges data from both DStreams together, independently of key existence in both of them:
# Goal: get students registers last year and this year ([ ("Student1": "IT"), ("Student2": "Biology") ]).outerJoin("[ ("Student1": "IT") ] -> ( ("Student1": ("IT", "IT")), ("Student2": ("Biology", "") )
- repartition - changes the number of partitions reserved to each RDD within given DStream:
# put all RDDs on 1 partition ({["RDD1_part1"], ["RDD1_part2"]}, {["RDD2_part1"], ["RDD2_part2"]}).repartiton(1) -> ({"RDD1_part1_+_part2"}, {"RDD2_part1_+_part2"})
- group by key - generates a pair DStream containing tuples composed by a key and a list of values:
# Get all clubs of given player groupByKey( ["Player1": "Club1"], ["Player1": "Club2"], ["Player2: "Club1"]) -> ( [("Player1": ["Club1", "Club2"]), ("Player2": ["Club1"]) )
- reduce by key - applies reduce function for entries sharing the same key:
( [ ("A": 1), ("B": 2) ], [ ("A": 1) ]).reduceByKey((v1, v2) -> v1 + v2) -> ( [ ("A": 2), ("B": 2) ] )
- combine by key - applies combine function for entries sharing the same key:
# Group all players by club ([ ("Club1": "Player1"), ("Club1": "Player2") ], [ ("Club1": "Player3") ]).combineByKey() -> ([ ("Club1": ["Player1", "Player2", "Player3"]) ])
- cogroup - groups data from 2 different DStreams by key:
# Get players with scored goals by season ([("Player1": 30), ("Player2": 10)]).cogroup( ([("Player1": 10)]) ) -> ( ("Player1": (30, 10)), ("Player2": (10, "")) )
Examples of Spark Streaming transformations
This section contains tests for transformations applied on single and pair DStreams. The first snippets concern single DStream:
private Queue<JavaRDD<String>> testQueue; private static final long BATCH_INTERVAL = 1_500L; @Test public void should_filter_dstream() throws InterruptedException, IOException { triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); JavaDStream<String> filteredDStream = queueDStream.filter(label -> label.equals(makeTestLabel(0))); List<String> receivedData = new ArrayList<>(); filteredDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(15_000L); assertThat(receivedData).hasSize(1); assertThat(receivedData).containsOnly(makeTestLabel(0)); } @Test public void should_map_dstream() throws InterruptedException, IOException { triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); JavaDStream<Integer> mappedDStream = queueDStream.map(label -> toInt(label)); List<Integer> receivedData = new ArrayList<>(); mappedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); assertThat(receivedData).hasSize(4); assertThat(receivedData).containsOnly(0, 1, 2, 3); } @Test public void should_flatmap_dstream() throws IOException, InterruptedException { triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); JavaDStream<Integer> flatMappedDStream = queueDStream.flatMap(label -> Arrays.asList(toInt(label)).iterator()); JavaDStream<List<Integer>> mappedDStream = queueDStream.map(label -> Arrays.asList(toInt(label))); List<Integer> receivedFlatMappedData = new ArrayList<>(); flatMappedDStream.foreachRDD(rdd -> receivedFlatMappedData.addAll(rdd.collect())); List<List<Integer>> receivedMappedData = new ArrayList<>(); mappedDStream.foreachRDD(listJavaRDD -> receivedMappedData.addAll(listJavaRDD.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); // As you can observe below, the difference between flat map and map consists // on format of returned items. For simple map function, when we deal with a collection of // elements (in our case, single item lists), DStream's RDDs will contain // a lists of lists with data ( [ [0], [1], [2], [3] ] ). // In the case of flat map they will have flattened lists with data ( [0, 1, 2, 3] ). assertThat(receivedFlatMappedData).hasSize(4); assertThat(receivedFlatMappedData).containsOnly(0, 1, 2, 3); assertThat(receivedMappedData).hasSize(4); assertThat(receivedMappedData) .containsOnly(Arrays.asList(0), Arrays.asList(1), Arrays.asList(2), Arrays.asList(3)); } @Test public void should_map_partition() throws IOException, InterruptedException { triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); // For test purposes we set a simple common prefix but you'd imagine here // a slow database or web service connection retrieving a lot // of objects shared by all transformed items String prefix = ">>>> "; JavaDStream<String> partitionMappedDStream = queueDStream.mapPartitions(labelsIterator -> { List<String> prefixedLabels = new ArrayList<>(); // labelsIterator contains all items within treated RDD's partition labelsIterator.forEachRemaining(label -> prefixedLabels.add(prefix + label)); return prefixedLabels.iterator(); }); List<String> receivedData = new ArrayList<>(); partitionMappedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); assertThat(receivedData).hasSize(4); assertThat(receivedData).containsOnly(prefix + makeTestLabel(0), prefix + makeTestLabel(1), prefix + makeTestLabel(2), prefix + makeTestLabel(3)); } @Test public void should_glom_dstream() throws IOException, InterruptedException { triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); // It coalesces all elements of given partition to a single array. // Representing RDD as flattened array helps to avoid shuffling in some situations, as for // example the maximum retrieval described in this post: // http://blog.madhukaraphatak.com/glom-in-spark/ JavaDStream<List<String>> glommedDStream = queueDStream.glom(); List<Integer> partitionSizes = new ArrayList<>(); List<List<String>> receivedData = new ArrayList<>(); glommedDStream.foreachRDD(rdd -> { partitionSizes.add(rdd.partitions().size()); receivedData.addAll(rdd.collect()); }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); // We expect to have RDDs of 4 partitions when stream read data and 1 partition when // there were no data to read. // In consequence glom operates on below RDDs: // * 1st data composed by 4 single item partitions ["Number 0", "", "", ""] // * 2nd data composed by 4 single item partitions ["Number 1", "", "", ""] // * 3rd data composed by 4 single item partitions ["Number 2", "", "", ""] // * 4th data composed by 4 single item partitions ["Number 3", "", "", ""] // * the others composed by a single empty partition [""] // After calling RDD.collect(), the result of glom is below: [ // [], [], [], [Number 0], [], [], [], [Number 1], [], [], [], [Number 2], [], [], [], [Number 3], [], [], [] // ] <- common array of all coalesced RDDs // If given partition had more than 1 element, they would be put together into common array. assertThat(receivedData).hasSize(partitionSizes.stream().reduce((v1, v2) -> v1 + v2).get()); assertThat(receivedData).containsOnly( singletonList(makeTestLabel(0)), singletonList(makeTestLabel(1)), singletonList(makeTestLabel(2)), singletonList(makeTestLabel(3)), emptyList()); assertThat(partitionSizes.subList(0, 3)).containsOnly(4); assertThat(partitionSizes.subList(4, partitionSizes.size())).containsOnly(1); } @Test public void should_repartition_dstream() throws IOException, InterruptedException { triggerDataCreation(4); boolean notOneAtTime = false; // Start by checking the number of initial partitions JavaDStream<String> queueDStream = streamingContext.queueStream(testQueue, notOneAtTime); Set<Integer> initialPartitionSizes = new HashSet<>(); queueDStream.foreachRDD(rdd -> { if (!rdd.collect().isEmpty()) { initialPartitionSizes.add(rdd.partitions().size()); } }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(5_000L); streamingContext.stop(); // Restart context and check partitions after repartition JavaStreamingContext localStreamingContext = new JavaStreamingContext(CONFIGURATION, Durations.milliseconds(BATCH_INTERVAL)); queueDStream = localStreamingContext.queueStream(testQueue, notOneAtTime); JavaDStream<String> repartitionedDStream = queueDStream.repartition(2); Set<Integer> repartitionedSizes = new HashSet<>(); repartitionedDStream.foreachRDD(rdd -> { repartitionedSizes.add(rdd.partitions().size()); }); localStreamingContext.start(); localStreamingContext.awaitTerminationOrTimeout(5_000L); assertThat(repartitionedSizes).containsOnly(2); assertThat(initialPartitionSizes).doesNotContain(2); } @Test public void should_reduce_dstream() throws IOException, InterruptedException { triggerDataCreation(4); // Reduce returns a new DStream. It's composed by RDDs constructed by applying // reduce operations on them. So to see reduce() works, we need to ingest more than 1 item // at time to our queue stream. JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME); JavaDStream<String> reducedDStream = queueDStream.reduce(new LabelsConcatenator()); List<String> receivedData = new ArrayList<>(); reducedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); assertThat(receivedData).hasSize(1); assertThat(receivedData.get(0)) .isEqualTo("Labels are: Number 0, Number 1, Number 2, Number 3"); } @Test public void should_count_rdds_in_dstream() throws InterruptedException, IOException { triggerDataCreation(5); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); JavaDStream<Long> countDStream = queueDStream.map(label -> toInt(label)) .filter(number -> number > 2) .count(); List<Long> receivedData = new ArrayList<>(); countDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(8_000L); assertThat(receivedData).hasSize(5); // count() is applied per RDD. Since queueStream gets 1 RDD at time, // it's normal to have 5 counters assertThat(receivedData).containsOnly(0L, 0L, 0L, 1L, 1L); } @Test public void should_transform_dstream() throws IOException, InterruptedException { triggerDataCreation(5); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); // transform() provides more fine-grained control on // transformations applied on each RDD held by // given DStream JavaDStream<Integer> transformedDStream = queueDStream.transform(stringRDD -> stringRDD .filter(text -> !text.isEmpty()) .map(label -> toInt(label))); List<Integer> receivedData = new ArrayList<>(); transformedDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(8_000L); assertThat(receivedData).hasSize(5); assertThat(receivedData).containsOnly(0, 1, 2, 3, 4); } @Test public void should_transform_dstream_to_pair() throws IOException, InterruptedException { triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); JavaPairDStream<String, Integer> pairDStream = queueDStream.transformToPair(labelRDD -> labelRDD.mapToPair(label -> new Tuple2<>(label, toInt(label)))); List<Tuple2<String, Integer>> receivedData = new ArrayList<>(); pairDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); assertThat(receivedData).hasSize(4); assertThat(receivedData).contains(new Tuple2<>(makeTestLabel(0), 0), new Tuple2<>(makeTestLabel(1), 1), new Tuple2<>(makeTestLabel(2), 2), new Tuple2<>(makeTestLabel(3), 3)); } @Test public void should_join_two_dstreams_with_union_transformation() throws IOException, InterruptedException { triggerDataCreation(5); boolean oneAtTime = false; JavaInputDStream<String> queueDStreamFirst5 = streamingContext.queueStream(testQueue, oneAtTime); testQueue.clear(); triggerDataCreation(5, 15); JavaInputDStream<String> queueDStreamFirst10 = streamingContext.queueStream(testQueue, oneAtTime); JavaDStream<String> jointDStream = queueDStreamFirst5.union(queueDStreamFirst10); List<String> receivedData = new ArrayList<>(); jointDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(12_000L); assertThat(receivedData).hasSize(15); assertThat(receivedData).containsOnly(makeTestLabel(0), makeTestLabel(1), makeTestLabel(2), makeTestLabel(3), makeTestLabel(4), makeTestLabel(5), makeTestLabel(6), makeTestLabel(7), makeTestLabel(8), makeTestLabel(9), makeTestLabel(10), makeTestLabel(11), makeTestLabel(12), makeTestLabel(13), makeTestLabel(14)); } @Test public void should_get_sliced_data_from_dstream() throws IOException, InterruptedException { // Purely speaking, slice is not a transformation but it deserves // its place in this examples list because it's widely used // by other transformations, as window-based ones triggerDataCreation(5); long startTime = System.currentTimeMillis(); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); queueDStream.filter(t -> true).foreachRDD(rdd -> {}); streamingContext.start(); // The first received RDD should be returned here List<JavaRDD<String>> slicedDStream = queueDStream.slice(new Time(startTime), new Time(startTime+BATCH_INTERVAL)); assertThat(slicedDStream).hasSize(1); assertThat(slicedDStream.get(0).collect()).containsOnly(makeTestLabel(0)); // Here 2 first RDDs should be returned slicedDStream = queueDStream.slice(new Time(startTime), new Time(startTime+BATCH_INTERVAL*2)); assertThat(slicedDStream).hasSize(2); assertThat(slicedDStream.get(0).collect()).containsOnly(makeTestLabel(0)); assertThat(slicedDStream.get(1).collect()).containsOnly(makeTestLabel(1)); streamingContext.awaitTerminationOrTimeout(8_000L); } @Test public void should_apply_count_by_value_when_all_rdds_are_in_dstream() throws IOException, InterruptedException { boolean notOneAtTime = false; triggerDataCreation(4); triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, notOneAtTime); JavaPairDStream<String, Long> counterByValueDStream = queueDStream.countByValue(); List<Tuple2<String, Long>> receivedData = new ArrayList<>(); counterByValueDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(12_000L); assertThat(receivedData).hasSize(4); assertThat(receivedData).containsOnly(new Tuple2<>(makeTestLabel(0), 2L), new Tuple2<>(makeTestLabel(1), 2L), new Tuple2<>(makeTestLabel(2), 2L), new Tuple2<>(makeTestLabel(3), 2L)); } @Test public void should_apply_count_by_value_when_there_are_1_rdd_per_dstream() throws IOException, InterruptedException { triggerDataCreation(4); triggerDataCreation(4); JavaInputDStream<String> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME); JavaPairDStream<String, Long> counterByValueDStream = queueDStream.countByValue(); List<Tuple2<String, Long>> receivedData = new ArrayList<>(); counterByValueDStream.foreachRDD(rdd -> receivedData.addAll(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(12_000L); // countByValue(...) applies individually on RDDs contained in given DStream. In our case // we include 1 RDD each time. So, the RDD#1 has this format: ["Number 0"], RDD#2 this one: ["Number 1"], // RDD#3 this one: ["Number 2"] and so on. // It's the reason why, unlike in should_apply_count_by_value_when_all_RDDs_are_in_DStream, here // we receive a list of 8 elements. assertThat(receivedData).hasSize(8); assertThat(receivedData).containsOnly(new Tuple2<>(makeTestLabel(0), 1L), new Tuple2<>(makeTestLabel(1), 1L), new Tuple2<>(makeTestLabel(2), 1L), new Tuple2<>(makeTestLabel(3), 1L)); } private static class LabelsConcatenator implements Function2<String, String, String>, Serializable { private static final String PREFIX = "Labels are: "; @Override public String call(String baseLabel, String labelToConcatenate) throws Exception { String prefix = baseLabel.contains(PREFIX) ? "" : PREFIX; return prefix + baseLabel + ", " + labelToConcatenate; } } private void triggerDataCreation(int start, int maxRDDs) throws IOException { for (int i = start; i < maxRDDs; i++) { JavaRDD<String> newRDD1 = batchContext.parallelize(Arrays.asList(makeTestLabel(i))); testQueue.add(newRDD1); } } private void triggerDataCreation(int maxRDDs) throws IOException { triggerDataCreation(0, maxRDDs); } private static String makeTestLabel(int number) { return "Number " + number; } private static Integer toInt(String label) { return Integer.valueOf(label.replace("Number ", "").trim()); }
And below the tests for pair DStream transformations:
private static final boolean ONE_AT_TIME = true; private static final long BATCH_INTERVAL = 1_500L; private Queue<JavaRDD<YellowPagesEntry>> testQueue; @Test public void should_cogroup_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 2, 10, 11); triggerDataCreation(0, 2, 12, 13); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); testQueue.clear(); triggerDataCreation(0, 3, 20, 21, 22); JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); // As you will see, cogroup groups data sharing the same key from // both DStreams JavaPairDStream<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupedPairDStream = pairEntriesDStream.cogroup(pairEntriesDStream2); Map<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> receivedData = new HashMap<>(); cogroupedPairDStream.foreachRDD(rdd -> { List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> collectedData = rdd.collect(); if (!collectedData.isEmpty()) { collectedData.forEach(entry -> receivedData.put(entry._1(), entry._2())); } }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); assertThat(receivedData).hasSize(3); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")._1()).containsOnly(10, 12); assertThat(receivedData.get("Person 0")._2()).containsOnly(20); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")._1()).containsOnly(11, 13); assertThat(receivedData.get("Person 1")._2()).containsOnly(21); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")._1()).isEmpty(); assertThat(receivedData.get("Person 2")._2()).containsOnly(22); } @Test public void should_union_pair_dstream() throws InterruptedException, IOException { triggerDataCreation(0, 5, 10, 11, 12, 13, 14); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); testQueue.clear(); triggerDataCreation(0, 10, 210, 211, 212, 213, 214, 15, 16, 17, 18, 19); JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Integer> unionPairDStream = pairEntriesDStream.union(pairEntriesDStream2); List<List<Tuple2<String, Integer>>> dataPerRDD = new ArrayList<>(); unionPairDStream.foreachRDD(rdd -> dataPerRDD.add(rdd.collect())); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(15_000L); assertThat(dataPerRDD).hasSize(10); assertThat(dataPerRDD.get(0)).hasSize(2); assertThat(dataPerRDD.get(0)) .containsOnly(new Tuple2<>("Person 0", 10), new Tuple2<>("Person 0", 210)); assertThat(dataPerRDD.get(1)).hasSize(2); assertThat(dataPerRDD.get(1)) .containsOnly(new Tuple2<>("Person 1", 11), new Tuple2<>("Person 1", 211)); assertThat(dataPerRDD.get(2)).hasSize(2); assertThat(dataPerRDD.get(2)) .containsOnly(new Tuple2<>("Person 2", 12), new Tuple2<>("Person 2", 212)); assertThat(dataPerRDD.get(3)).hasSize(2); assertThat(dataPerRDD.get(3)) .containsOnly(new Tuple2<>("Person 3", 13), new Tuple2<>("Person 3", 213)); assertThat(dataPerRDD.get(4)).hasSize(2); assertThat(dataPerRDD.get(4)) .containsOnly(new Tuple2<>("Person 4", 14), new Tuple2<>("Person 4", 214)); assertThat(dataPerRDD.get(5)).hasSize(1); assertThat(dataPerRDD.get(5)).containsOnly(new Tuple2<>("Person 5", 15)); assertThat(dataPerRDD.get(6)).hasSize(1); assertThat(dataPerRDD.get(6)).containsOnly(new Tuple2<>("Person 6", 16)); assertThat(dataPerRDD.get(7)).hasSize(1); assertThat(dataPerRDD.get(7)).containsOnly(new Tuple2<>("Person 7", 17)); assertThat(dataPerRDD.get(8)).hasSize(1); assertThat(dataPerRDD.get(8)).containsOnly(new Tuple2<>("Person 8", 18)); assertThat(dataPerRDD.get(9)).hasSize(1); assertThat(dataPerRDD.get(9)).containsOnly(new Tuple2<>("Person 9", 19)); } @Test public void should_map_values_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 5, 10, 11, 12, 13, 14); triggerDataCreation(0, 3, 20, 21, 22); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, String> mappedPairDStream = pairEntriesDStream.mapValues(telNumber -> makePersonIntro(telNumber)); List<Tuple2<String, String>> receivedData = new ArrayList<>(); mappedPairDStream.foreachRDD(rdd -> { List<Tuple2<String, String>> collectedData = rdd.collect(); if (!collectedData.isEmpty()) { receivedData.addAll(collectedData); } }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(15_000L); assertThat(receivedData).hasSize(8); assertThat(receivedData.get(0)).isEqualTo(new Tuple2<>("Person 0", makePersonIntro(10))); assertThat(receivedData.get(1)).isEqualTo(new Tuple2<>("Person 1", makePersonIntro(11))); assertThat(receivedData.get(2)).isEqualTo(new Tuple2<>("Person 2", makePersonIntro(12))); assertThat(receivedData.get(3)).isEqualTo(new Tuple2<>("Person 3", makePersonIntro(13))); assertThat(receivedData.get(4)).isEqualTo(new Tuple2<>("Person 4", makePersonIntro(14))); assertThat(receivedData.get(5)).isEqualTo(new Tuple2<>("Person 0", makePersonIntro(20))); assertThat(receivedData.get(6)).isEqualTo(new Tuple2<>("Person 1", makePersonIntro(21))); assertThat(receivedData.get(7)).isEqualTo(new Tuple2<>("Person 2", makePersonIntro(22))); } @Test public void should_flat_map_values_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 2, 10, 11); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, String> flatMappedPairDStream = pairEntriesDStream.flatMapValues(telNumber -> singletonList(makePersonIntro(telNumber))); JavaPairDStream<String, Iterable<String>> mappedPairDStream = pairEntriesDStream.mapValues(telNumber -> singletonList(makePersonIntro(telNumber))); List<Tuple2<String, String>> receivedDataFromFlatMap = new ArrayList<>(); flatMappedPairDStream.foreachRDD(rdd -> { List<Tuple2<String, String>> collectedData = rdd.collect(); if (!collectedData.isEmpty()) { receivedDataFromFlatMap.addAll(collectedData); } }); List<Tuple2<String, Iterable<String>>> receivedDataFromMap = new ArrayList<>(); mappedPairDStream.foreachRDD(rdd -> { List<Tuple2<String, Iterable<String>>> collectedData = rdd.collect(); if (!collectedData.isEmpty()) { receivedDataFromMap.addAll(collectedData); } }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(5_000L); // The difference between map and flat map is, once again, the fact of // flattening entries in the second. For the same function, map // returns a collection of data. assertThat(receivedDataFromFlatMap).hasSize(2); assertThat(receivedDataFromFlatMap.get(0)).isEqualTo(new Tuple2<>("Person 0", makePersonIntro(10))); assertThat(receivedDataFromFlatMap.get(1)).isEqualTo(new Tuple2<>("Person 1", makePersonIntro(11))); assertThat(receivedDataFromMap).hasSize(2); assertThat(receivedDataFromMap.get(0)).isEqualTo(new Tuple2<>("Person 0", singletonList(makePersonIntro(10)))); assertThat(receivedDataFromMap.get(1)).isEqualTo(new Tuple2<>("Person 1", singletonList(makePersonIntro(11)))); } @Test public void should_combine_by_key_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 3, 10, 11, 12); triggerDataCreation(0, 3, 20, 21, 22); triggerDataCreation(0, 2, 30, 31); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber)); // DStream's combineByKey(...) is similar to RDD's combineByKey(...). First, // we create initialization function. After we create a function working on initialized // element with subsequent treated items. Finally we merge combined results. // The last parameter is partitioner to use. JavaPairDStream<String, List<Integer>> combinedPairDStream = pairEntriesDStream.combineByKey( Lists::newArrayList, (telNumbersList, telNumber) -> { telNumbersList.add(telNumber); return telNumbersList; }, (telNumbersList1, telNumbersList2) -> { telNumbersList1.addAll(telNumbersList2); return telNumbersList1; }, new SinglePartitionPartitioner() ); Map<String, List<Integer>> receivedData = new HashMap<>(); combinedPairDStream.foreachRDD(rdd -> { rdd.collectAsMap().forEach((person, telNumbers) -> receivedData.put(person, telNumbers)); }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(15_000L); assertThat(receivedData).hasSize(3); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")).hasSize(3).containsOnly(10, 20, 30); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")).hasSize(3).containsOnly(11, 21, 31); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")).hasSize(2).containsOnly(12, 22); } @Test public void should_reduce_by_key_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 5, 10, 11, 12, 13, 14); triggerDataCreation(0, 3, 20, 21, 22); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Integer> groupedByKeyPairDStream = pairEntriesDStream .reduceByKey((v1, v2) -> v1+v2); Map<String, Integer> receivedData = new HashMap<>(); groupedByKeyPairDStream.foreachRDD(rdd -> { Map<String, Integer> collectedData = rdd.collectAsMap(); if (!collectedData.isEmpty()) { collectedData.forEach((person, sum) -> receivedData.put(person, sum)); } }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(5_000L); assertThat(receivedData).hasSize(5); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")).isEqualTo(30); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")).isEqualTo(32); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")).isEqualTo(34); assertThat(receivedData.get("Person 3")).isNotNull(); assertThat(receivedData.get("Person 3")).isEqualTo(13); assertThat(receivedData.get("Person 4")).isNotNull(); assertThat(receivedData.get("Person 4")).isEqualTo(14); } @Test public void should_group_by_key_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 5, 10, 11, 12, 13, 14); triggerDataCreation(0, 3, 20, 21, 22); // If 'one at time' is false, received data will be 1 per RDD, // so the grouping won't be done. JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, !ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Iterable<Integer>> groupedByKeyPairDStream = pairEntriesDStream.groupByKey(); List<Tuple2<String, Iterable<Integer>>> receivedData = new ArrayList<>(); groupedByKeyPairDStream.foreachRDD(rdd -> { List<Tuple2<String, Iterable<Integer>>> collectedData = rdd.collect(); if (!collectedData.isEmpty()) { receivedData.addAll(collectedData); } }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(15_000L); assertThat(receivedData).hasSize(5); Tuple2<String, Iterable<Integer>> person0 = findPerson(receivedData, "Person 0"); assertThat(person0._2()).containsOnly(10, 20); Tuple2<String, Iterable<Integer>> person1 = findPerson(receivedData, "Person 1"); assertThat(person1._2()).containsOnly(11, 21); Tuple2<String, Iterable<Integer>> person2 = findPerson(receivedData, "Person 2"); assertThat(person2._2()).containsOnly(12, 22); Tuple2<String, Iterable<Integer>> person3 = findPerson(receivedData, "Person 3"); assertThat(person3._2()).containsOnly(13); Tuple2<String, Iterable<Integer>> person4 = findPerson(receivedData, "Person 4"); assertThat(person4._2()).containsOnly(14); } @Test public void should_join_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 5, 10, 11, 12, 13, 14); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); testQueue.clear(); triggerDataCreation(0, 3, 20, 21, 22); JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Tuple2<Integer, Integer>> joinedPairDStream = pairEntriesDStream.join(pairEntriesDStream2); Map<String, Tuple2<Integer, Integer>> receivedData = new HashMap<>(); joinedPairDStream.foreachRDD(rdd -> { rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers)); }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(15_000L); // Only 3 values are expected because join is strict - if there are no common // values in both DStream's RDDs, they won't be merged assertThat(receivedData).hasSize(3); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(10, 20)); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(11, 21)); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(12, 22)); } @Test public void should_left_outer_join_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 3, 10, 11, 12); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); testQueue.clear(); triggerDataCreation(0, 2, 20, 21); JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Tuple2<Integer, Optional<Integer>>> joinedPairDStream = pairEntriesDStream.leftOuterJoin(pairEntriesDStream2); Map<String, Tuple2<Integer, Optional<Integer>>> receivedData = new HashMap<>(); joinedPairDStream.foreachRDD(rdd -> { rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers)); }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); // Left outer join includes all values from the left pair DStream // and only the matching ones from the right DStream assertThat(receivedData).hasSize(3); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(10, Optional.of(20))); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(11, Optional.of(21))); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(12, Optional.absent())); } @Test public void should_right_outer_join_for_pair_dstream() throws InterruptedException, IOException { triggerDataCreation(0, 2, 10, 11); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); testQueue.clear(); triggerDataCreation(0, 3, 20, 21, 22); JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Tuple2<Optional<Integer>, Integer>> joinedPairDStream = pairEntriesDStream.rightOuterJoin(pairEntriesDStream2); Map<String, Tuple2<Optional<Integer>, Integer>> receivedData = new HashMap<>(); joinedPairDStream.foreachRDD(rdd -> { rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers)); }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); // Right outer join works inversely to the left outer join - it includes all values from // the right pair DStream and only the matching ones from the right DStream assertThat(receivedData).hasSize(3); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(Optional.of(10), 20)); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(Optional.of(11), 21)); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(Optional.absent(), 22)); } @Test public void should_full_outer_join_for_pair_dstream() throws IOException, InterruptedException { triggerDataCreation(0, 2, 10, 11); JavaPairDStream<String, Integer> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); testQueue.clear(); triggerDataCreation(2, 4, 0, 0, 20, 21); JavaPairDStream<String, Integer> pairEntriesDStream2 = streamingContext.queueStream(testQueue, ONE_AT_TIME) .transformToPair(rdd -> rdd.mapToPair(entry -> new Tuple2<>(entry.person, entry.telNumber))); JavaPairDStream<String, Tuple2<Optional<Integer>, Optional<Integer>>> joinedPairDStream = pairEntriesDStream.fullOuterJoin(pairEntriesDStream2); Map<String, Tuple2<Optional<Integer>, Optional<Integer>>> receivedData = new HashMap<>(); joinedPairDStream.foreachRDD(rdd -> { rdd.collectAsMap().forEach((person, joinedTelNumbers) -> receivedData.put(person, joinedTelNumbers)); }); streamingContext.start(); streamingContext.awaitTerminationOrTimeout(10_000L); // Full outer join takes all data from both DStreams assertThat(receivedData).hasSize(4); assertThat(receivedData.get("Person 0")).isNotNull(); assertThat(receivedData.get("Person 0")).isEqualTo(new Tuple2<>(Optional.of(10), Optional.absent())); assertThat(receivedData.get("Person 1")).isNotNull(); assertThat(receivedData.get("Person 1")).isEqualTo(new Tuple2<>(Optional.of(11), Optional.absent())); assertThat(receivedData.get("Person 2")).isNotNull(); assertThat(receivedData.get("Person 2")).isEqualTo(new Tuple2<>(Optional.absent(), Optional.of(20))); assertThat(receivedData.get("Person 3")).isNotNull(); assertThat(receivedData.get("Person 3")).isEqualTo(new Tuple2<>(Optional.absent(), Optional.of(21))); } private static String makePersonIntro(int telNumber) { int modulo = telNumber%10; return "This is person "+modulo + " with tel number "+telNumber; } private static <T> Tuple2<String, T> findPerson(List<Tuple2<String, T>> receivedData, String personLabel) { return receivedData.stream().filter(tuple -> tuple._1().equals(personLabel)).findAny().get(); } private void triggerDataCreation(int startIndex, int maxRDDs, int...telNumbers) throws IOException { for (int i = startIndex; i < maxRDDs; i++) { JavaRDD<YellowPagesEntry> newRDD1 = batchContext.parallelize(Arrays.asList(new YellowPagesEntry("Person " + i, telNumbers[i]))); testQueue.add(newRDD1); } } private static class YellowPagesEntry implements Serializable { private String person; private int telNumber; public YellowPagesEntry(String person, int telNumber) { this.person = person; this.telNumber = telNumber; } } private static class SinglePartitionPartitioner extends Partitioner implements Serializable { @Override public int numPartitions() { return 1; } @Override public int getPartition(Object key) { System.out.println("Partition for key " + key); return 0; } }
This post introduces available transformations in Spark Streaming. Its first part explains what these transformations are in the context of streaming processing. The second part lists some of major transformations and shows how they work in pseudocode snippets. The last part gives less abstract idea of transformations use because it presents their use through unit tests.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects