Transformations in Spark

on waitingforcode.com

Transformations in Spark

One of methods generating new RDD consists on applying transformations on already existent RDDs. But transformations not only makes new RDDs but also gives a sense to all data processing.

Without transformations we wouldn't be able to filter data in one RDD or joining two RDD together. This article shows some of available RDD transformations. The first section lists and explains through no programming examples what given transformations do. The second section implements these transformations through Spark's Java API.

Spark transformations explained

In Spark we can find several transformations:

  • filtering - as Java's 8 filter() method, the Spark's one applies one predicate to keep only valid RDD entries.
    # Goal: keep only words with 2 letters
    filter(["A", "aa", "bbb", "CC"])
      -> ["aa", "CC"]
    
  • sorting - the name also explains the operation very clearly. It helps to order RDD data:
    # Goal: order letters in descending order
    sortBy(["a", "b", "d", "o", "e"])
      -> ["o", "e", "d", "b", "a"]
    
  • grouping - allows to group data by one of RDD data property or by a property derived from data:
    # Goal: group words by their first letters
    groupBy(["cat", "dog", "chicken"])
      -> {"c" -> ["cat", "chicken"],
          "d" -> ["dog"]}
    
  • map - it could seem similar to grouping but in fact it more like transformation of object from one format to another:
    # Goal: transform words to the number of letters
    map(["cat", "dog", "chicken"])
      -> [[3], [3], [7]]
    
  • flatten map - similar to map. The difference is that flatten map is able to transform collections to single one collection:
    # Goal: transform words from lists to the number of letters
    flatMap(["cat", "dog", "fish", "chicken"])
      -> [3, 3, 4, 7]
    
    # While map should give this output
    map(["cat", "dog", "fish", "chicken"])
      -> {[3], [3], [4], [7]}
    
  • distinct - generated RDD keeps only distinct values:
    # Goal: keep only distinct family names
    distinct(["Nowak", "James", "Nowak"]) 
      -> ["Nowak", "James"]
    
  • union - as SQL's union, Spark's one also joins two RDDs. It's important that both joined RDD must be of the same type:
    # Goal: get all letters defined in both RDD
    ["A", "B", "C"].union["D", "E", "F", "A"]
      -> ["A", "B", "C", "D", "E", "F", "A"]
    
  • substract - the result of this transformation keeps only elements defined in the 1st RDD which are not defined in the 2nd:
    # Goal: keep players didn't play in both games (1 game = 1 RDD)
    ["P1", "P2", "P3", "P4"].substract(
      ["P1, "P2", "P5", "P6"]) -> ["P3", "P4"] 
    
  • cartesian - very useful to make data combinations, ie. cartesian product of 2 RDDs:
    # Goal: generate all possible strategies
    ["S1", "S2"].cartesian(["S3", "S4"])
      -> ["S1_S3", "S1_S4", "S2_S3", "S2_S4"]
    
  • interesection - returns common elements for both RDDs:
    # Goal: keep people who visited both sites
    ["P1", "P2", "P3"].interesection(["P3"])
      -> ["P3"]
    
  • sample - takes and returns a subset of RDD (a sample):
    # Goal : take the randomly chosen winner of a contest
    ["P1", "P2", "P3", "P4"].sample(1) -> ["P5"]
    
  • coalesce - can be used to change the size of partitions held RDD data. If the number of partition decreases, data held by extra partitions will be moved. In this way, this operation is more performant than repartition() one which moves all data across cluster's nodes:
    Partition#1 = [1, 2, 3]
    Partition#2 = [4, 5, 6]
    Partition#3 = [7, 8, 9]
    
    # New number of partitions 2
    # Coalesce result - possible result
    Partition#1 = [1, 2, 3] + [7]
    Partition#2 = [4, 5, 6] + [8, 9]
    
    # Repartition result
    Partition#1 = [1, 4, 5, 8]
    Partition#2 = [2, 3, 6, 7, 9]
    
  • joins - as SQL, Spark defines several types of joins, all creating a pair RDD. We can distinguish:
    # join() - returns all values from the first RDD which exist in the second RDD
    [1: "a", 2: "b"].join([1: "a", 3: "c") -> [1: (["a"], ["a"])]
    
    # leftOuterJoin() - returns all values from the first RDD and only matching ones
    #                   from the second
    [1: "a", 2: "b"].join([1: "a", 3: "c")
      -> [1: (["a"], ["a"]), 2: (["b"], [])]
    
    # rightOuterJoin() - returns all values from the second RDD and only matching ones
    #                    from the first
    [1: "a", 2: "b"].join([1: "a", 3: "c")
      -> [1: (["a"], ["a"]), 3: ([], ["c"])]
    
    # fullOuterJoin() - returns all values, from both RDDs, even if there are some values
    #                   from the 1st missing in the 2nd and inversely 
    [1: "a", 2: "b"].join([1: "a", 3: "c")
      -> [1: (["a"], ["a"]), 2: (["b"], []), 3: ([], ["c"])]
    
  • cogroup - groups data from 2 RDDs by key and constructs a pair of values in final RDD:
    # Goal: keep average salary by year 
    [("programmer", 1000), ("nurse", 1000)].cogroup(
      [("programmer", 2000), ("nurse", 2000), ("driver", 1500)] -> {
        "programmer": (1000, 2000),
        "nurse": (1000, 2000),
        "driver": (null, 1500)
    }
    
  • reduceByKey - merges values for each key:
    # Goal: get the oldest person for each gender
    {"M": [10, 20, 30], "W": [15, 25]}.reduceByKey(
      (v1, v2) -> v1 > v2 ? v1 : v2) -> {
     "M": 30, "W": 25
    }
    
  • groupByKey - groups values by key for a pair RDD:
    # Goal: group players by teams
    [("Team1", "P1"), ("Team1", "P2"), ("Team2", "P3"), ("Team1", "P4")].groupByKey()
      -> ["Team1": ["P1", "P2", "P4"], "Team2": ["P3"]]
    
  • aggregateByKey - works almost as groupsByKey. One subtle differente consists on a possibility to return different type than the one defined in transformed RDD:
    # Goal: translate int age to middle age (simplified definition of aggregateByKey)
    [(39, "Person 1"), (10, "Child 1"), (3, "Child 2"), (89, "Person 2")]
      .aggregateByKey(age -> resolveTextFromAge(age)) -> [39: ("Not so old", "Person 1"),
        10: ("Young", "Child 1"), 3: ("Like a new born", "Child 2"), 89: ("A little bit old", "Person 2")]
    
  • combineByKey - takes values of each key and combines them together. It can return a result of different type that the one stored in original RDD:
    # Goal: group people by ages
    [(39, "Person 1"), (10, "Child 1"), (10, "Child 2"), (89, "Person 2")]
      .combineByKey() -> [39: ["Person 1"], 10: ["Child 1", "Child 2"], 89: ["Person 2"]]
    
  • partitionBy - with this transformation we can redistribute data among partitions with the use of customized partitioner:
    # Goal: move good clients to one partition and bad to another
    ["g_Customer1", "b_Customer2", "b_Customer3"].partitionBy(CustomerPartitioner)
      -> partition#1 should only contain "g_Customer1", partition#2 should store remaining two
    
  • sortByKey - can be used to sort data by key with, possibly, customized comparator:
    # Goal: sort car from the most expensive to the least expensive
    [(300, "Car1"), (500, "Car2"), (100, "Car3")].sortByKey(ascending)
      -> [(100, "Car3"), (300, "Car1"), (500, "Car2")]
    
  • keys - from a pair RDD, it gets all defined keys, even duplicated:
    # Goal: get countries represented in the tournament
    [{"PL": ["p1"], "FR": ["P2"], "FR": ["P3", "P4"]}].keys() 
      -> ["PL", "FR", "FR"]
    
  • values - from a pair RDD, it gets all defined values, even duplicated:
    # Goal: keep all players represented in the tournament
    # P5 has double Polish-UK nationality
    [{"PL": ["P1", "P5"], "UK": ["P5"], "FR": ["P2"], "FR": ["P3", "P4"]}].keys() 
      -> ["P1", "P2", "P3", "P4", "P5", "P53"]
    

Spark Java API transformations

Below tests show some of main Spark transformations:

@Test
public void should_retain_players_having_at_least_20_points() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team1", FR, 7, 10);
  List<Player> players = Lists.newArrayList(player1, player2, player3);

  JavaRDD<Player> efficientPlayersRDD = CONTEXT.parallelize(players)
    .filter(player -> player.getPoints() >= 20);

  Collection<Player> efficientPlayers = efficientPlayersRDD.collect();

  assertThat(efficientPlayers).hasSize(2);
  assertThat(efficientPlayers).extracting("name").containsOnly("Player1", "Player3");
}

@Test
public void should_sort_players_by_team_and_name() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team2", FR, 7, 10);
  Player player4 = new Player("Player4", "Team1", DE, 17, 10);
  Player player5 = new Player("Player5", "Team2", PL, 3, 1);
  List<Player> players = Lists.newArrayList(player1, player4,   player3, player2, player5);

  JavaRDD<Player> playersRDD = CONTEXT.parallelize(players)
    .sortBy(player -> player.getTeam()+"_"+player.getName(), true, 1);

  List<Player> sortedPlayers = playersRDD.collect();

  assertThat(sortedPlayers).hasSize(5);
  assertThat(sortedPlayers).extracting("team").containsExactly("Team1", "Team1", "Team1", "Team2", "Team2");
  assertThat(sortedPlayers).extracting("name").containsExactly("Player1", "Player2", "Player4", "Player3", "Player5");
}

@Test
public void should_group_players_by_teams() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team2", FR, 7, 10);
  Player player4 = new Player("Player4", "Team1", DE, 17, 10);
  Player player5 = new Player("Player5", "Team2", PL, 3, 1);
  List<Player> players = Lists.newArrayList(player1, player2, player3, player4, player5);

  JavaPairRDD<String, Iterable<Player>> teamPlayersRDD = CONTEXT.parallelize(players)
    .groupBy(player -> player.getTeam());

  Map<String, Iterable<Player>> playersByTeam = teamPlayersRDD.collectAsMap();

  assertThat(playersByTeam).hasSize(2);
  assertThat(playersByTeam).containsOnlyKeys("Team1", "Team2");
  assertThat(playersByTeam.get("Team1").iterator()).extracting("name").containsOnly("Player1", "Player2", "Player4");
  assertThat(playersByTeam.get("Team2").iterator()).extracting("name").containsOnly("Player3", "Player5");
}

@Test
public void should_correctly_map_players_for_teams() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team2", FR, 7, 10);
  Player player4 = new Player("Player4", "Team1", DE, 17, 10);
  Player player5 = new Player("Player5", "Team2", PL, 3, 1);
  List<Player> players = Lists.newArrayList(player1, player4,   player3, player2, player5);

  // map() is different than groupBy() because map() makes a kind of
  // object transformation while the groupBy() takes already existent
  // objects and put them under a common "address"
  JavaRDD<String> playerNamesRDD = CONTEXT.parallelize(players)
    .map(player -> player.getName());

  List<String> playerNames = playerNamesRDD.collect();

  assertThat(playerNames).hasSize(5);
  assertThat(playerNames).containsOnly("Player1", "Player2", "Player3", "Player4", "Player5");
}

@Test
public void should_correctly_flat_a_map() {
  List<String> animals = Arrays.asList("cat", "dog", "fish", "chicken");
  JavaRDD<Integer> flattenKeywordsRDD = CONTEXT.parallelize(animals)
    .flatMap(animalName -> Arrays.asList(animalName.length()));
  JavaRDD<List<Integer>> notFlattenKeywordsRDD = CONTEXT.parallelize(animals)
    .map(animalName -> Arrays.asList(animalName.length()));
  List<Integer> flattenLengths = flattenKeywordsRDD.collect();
  List<List<Integer>> notFlattenLengths = notFlattenKeywordsRDD.collect();

  // The difference between map() and flattenMap() is visible thanks to the type
  // returned by collect() method
  // In the case of map(), we apply a function to each entry of List<String> keywords object
  // and the returned type applies for each of entries
  // In the other side, flatMap() applies a function to each entry of List<String> keywords object
  // but the final output is a concatenation of all separate ones
  assertThat(notFlattenLengths).hasSize(4);
  assertThat(notFlattenLengths).contains(Arrays.asList(3), Arrays.asList(3),  Arrays.asList(4), Arrays.asList(7));
  assertThat(flattenLengths).hasSize(4);
  assertThat(flattenLengths).containsOnly(3, 3, 4, 7);
}

@Test
public void should_return_distinct_letters() {
  List<String> letters = Lists.newArrayList("A", "B", "C", "D", "E", "E", "E", "E");

  JavaRDD<String> lettersRDD = CONTEXT.parallelize(letters).distinct();

  List<String> distinctLetters = lettersRDD.collect();

  assertThat(distinctLetters).hasSize(5);
  assertThat(distinctLetters).containsOnly("A", "B", "C", "D", "E");
}

@Test
public void should_combine_two_different_RDDs() {
  List<String> letters1 = Lists.newArrayList("A", "E", "O", "I", "A", "A", "E");
  List<String> letters2 = Lists.newArrayList("D", "K", "K", "L", "D", "L", "K", "N", "D");

  JavaRDD<String> letters1RDD = CONTEXT.parallelize(letters1);
  JavaRDD<String> letters2RDD = CONTEXT.parallelize(letters2);
  JavaRDD<String> allLettersRDD = letters1RDD.union(letters2RDD).distinct();

  List<String> allLetters = allLettersRDD.collect();

  assertThat(allLetters).hasSize(8);
  assertThat(allLetters).containsOnly("A", "E", "O", "I", "D", "K", "L", "N");
}

@Test
public void should_correctly_subtract_letters() {
  List<String> letters1 = Lists.newArrayList("A", "B", "C", "D", "E", "F", "F");
  List<String> letters2 = Lists.newArrayList("B", "C", "D", "F");

  // subtract() transformation returns a new RDD having elements of 'current' RDD which
  // are not defined in the 'other' RDD
  // 'current' means here the RDD on which we apply the transformation
  // 'other' is the RDD applying in transformation
  // The subtract is not made 1 by 1. It's the reason why we don't see
  // "F" in subtracted RDD, even if it's defined twice in 'current' RDD
  // and only once in 'other' RDD
  JavaRDD<String> letters1RDD = CONTEXT.parallelize(letters1);
  JavaRDD<String> letters2RDD = CONTEXT.parallelize(letters2);
  JavaRDD<String> subtractedRDD = letters1RDD.subtract(letters2RDD);

  List<String> collectAfterSubtract = subtractedRDD.collect();

  assertThat(collectAfterSubtract).hasSize(2);
  assertThat(collectAfterSubtract).containsOnly("A", "E");
}

@Test
public void should_correctly_compute_cartesian_product() {
  List<String> vowels = Lists.newArrayList("A","E");
  List<String> consonants = Lists.newArrayList("B", "C", "D", "F");

  JavaRDD<String> vowelsRDD = CONTEXT.parallelize(vowels);
  JavaRDD<String> consonantsRDD = CONTEXT.parallelize(consonants);
  JavaPairRDD<String, String> cartesianLettersRDD = vowelsRDD.cartesian(consonantsRDD);

  List<Tuple2<String, String>> cartesianLetters = cartesianLettersRDD.collect();
  List<String> normalizedCartesianLetters =
          cartesianLetters.stream().map(tuple -> tuple._1()+"_"+tuple._2()).collect(Collectors.toList());

  assertThat(normalizedCartesianLetters).hasSize(8);
  assertThat(normalizedCartesianLetters)
    .containsOnly("A_B", "A_C", "A_D", "A_F", "E_B", "E_C", "E_D", "E_F");
}

@Test
public void should_correctly_apply_intersection_transformation() {
  List<String> letters1 = Lists.newArrayList("A","E", "D", "F", "X", "Y", "Z", "D", "F");
  List<String> letters2 = Lists.newArrayList("B", "C", "D", "F");

  // intersection() simply returns common elements in both RDD
  // Returned elements aren't duplicated. So even if the first RDD contains
  // "D" and "F" defined twice, they will be returned only once in the
  // intersected RDD
  JavaRDD<String> letters1RDD = CONTEXT.parallelize(letters1);
  JavaRDD<String> letters2RDD = CONTEXT.parallelize(letters2);
  JavaRDD<String> intersectionRDD = letters1RDD.intersection(letters2RDD);

  List<String> commonLetters = intersectionRDD.collect();

  assertThat(commonLetters).hasSize(2);
  assertThat(commonLetters).containsOnly("D", "F");
}

@Test
public void should_sample_RDD() {
  List<String> letters = Lists.newArrayList("A","E", "A", "E");

  JavaRDD<String> allLettersRDD = CONTEXT.parallelize(letters);

  // Sampling is made either through Bernoulli sampling method or through
  // Poisson distribution. Used method depends on boolean flag set
  // as the first parameter of the method.
  // Since we specify a seed, we expect to get the same results on each test
  JavaRDD<String> sampledRDD = allLettersRDD.sample(true, 0.33, 2);

  List<String> sampledLetters = sampledRDD.collect();

  assertThat(sampledLetters).hasSize(5);
  assertThat(sampledLetters).containsExactly("A", "E", "A", "A", "E");
}

@Test
public void should_correctly_apply_coalesce_transformation() {
  List<Integer> numbers = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

  // coalesce simply reduce data to the number of partitions defined
  // as method parameter
  // If the number of partitions decreases, whole data won't be moved
  // across network. Spark will keep the number of specified partitions
  // and move data remained on the rest of partitions to them.
  JavaRDD<Integer> sampledRDD = CONTEXT.parallelize(numbers, 2);
  JavaRDD<Integer> coalescedRDD = sampledRDD.coalesce(1);

  assertThat(sampledRDD.partitions()).hasSize(2);
  assertThat(coalescedRDD.partitions()).hasSize(1);
}

@Test(expected = SparkException.class)
public void should_fail_on_transforming_not_serializable_object() {
  List<NotSerializablePlayer> players = Lists.newArrayList(
    new NotSerializablePlayer("Player1", "Team", 0, 0, 0),
    new NotSerializablePlayer("Player2", "Team", 0, 0, 0)
  );

  JavaRDD<NotSerializablePlayer> sampledRDD = CONTEXT.parallelize(players);
  JavaRDD<NotSerializablePlayer> onlyScorersRDD = sampledRDD.filter(player -> player.getGoals() > 0);
  onlyScorersRDD.collect(); 
}

// JavaPairRDD
@Test
public void should_correctly_join_RDDs() {
  List<Integer> numbers1 = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  List<Integer> numbers2 = Lists.newArrayList(20, 19, 18, 17, 16, 15);

  JavaPairRDD<Integer, Iterable<Integer>> numbers1RDD = 
    CONTEXT.parallelize(numbers1, 2).groupBy(number -> number%2);
  JavaPairRDD<Integer, Iterable<Integer>> numbers2RDD = 
    CONTEXT.parallelize(numbers2, 2).groupBy(number -> number%2);

  // join() groups entries for two RDDs by key
  Map<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>> joinedRDDMap =
    numbers1RDD.join(numbers2RDD).collectAsMap();

  assertThat(joinedRDDMap).hasSize(2);
  assertThat(joinedRDDMap.get(0)._1()).containsOnly(2, 4, 6, 8, 10);
  assertThat(joinedRDDMap.get(0)._2()).containsOnly(16, 18, 20);
  assertThat(joinedRDDMap.get(1)._1()).containsOnly(1, 3, 5, 7, 9);
  assertThat(joinedRDDMap.get(1)._2()).containsOnly(15, 17, 19);

  List<Tuple2<Integer, String>> tuples1 = Lists.newArrayList(
      new Tuple2<>(1, "a"), new Tuple2<>(2, "b"), new Tuple2<>(3, "c"), new Tuple2<>(4, "d")
  );
  List<Tuple2<Integer, String>> tuples2 = Lists.newArrayList(
      new Tuple2<>(1, "a"), new Tuple2<>(3, "c"), new Tuple2<>(3, "c"), new Tuple2<>(5, "e")
  );
  CONTEXT.parallelize(tuples1);
  JavaPairRDD<Integer, String> numbers3RDD = CONTEXT.parallelizePairs(tuples1);
  JavaPairRDD<Integer, String> numbers4RDD = CONTEXT.parallelizePairs(tuples2);
}

@Test
public void should_produce_empty_join_when_the_second_RDD_doesn_t_contain_correspondances() {
  List<Integer> numbers1 = Lists.newArrayList(2, 4);
  List<Integer> numbers2 = Lists.newArrayList(19);

  JavaPairRDD<Integer, Iterable<Integer>> numbers1RDD = CONTEXT.parallelize(numbers1, 2).groupBy(number -> number%2);
  JavaPairRDD<Integer, Iterable<Integer>> numbers2RDD = CONTEXT.parallelize(numbers2, 2).groupBy(number -> number%2);

  Map<Integer, Tuple2<Iterable<Integer>, Iterable<Integer>>> joinedRDDMap =
    numbers1RDD.join(numbers2RDD).collectAsMap();

  assertThat(joinedRDDMap).isEmpty();
}

@Test
public void should_correctly_apply_left_outer_join() {
  List<Integer> numbers1 = Lists.newArrayList(2, 4, 6, 8, 10);
  List<Integer> numbers2 = Lists.newArrayList(3, 5, 7);

  JavaPairRDD<Integer, Iterable<Integer>> numbers1RDD = CONTEXT.parallelize(numbers1, 2).groupBy(number -> number%2);
  JavaPairRDD<Integer, Iterable<Integer>> numbers2RDD = CONTEXT.parallelize(numbers2, 2).groupBy(number -> number%2);

  // join() groups entries for two RDDs by key
  Map<Integer, Tuple2<Iterable<Integer>, Optional<Iterable<Integer>>>> joinedRDDMap =
      numbers1RDD.leftOuterJoin(numbers2RDD).collectAsMap();

  assertThat(joinedRDDMap).hasSize(1);
  assertThat(joinedRDDMap.get(0)._1()).containsOnly(2, 4, 6, 8, 10);
  assertThat(joinedRDDMap.get(0)._2().asSet()).isEmpty();
}

@Test
public void should_correctly_apply_right_outer_join() {
  List<Integer> numbers1 = Lists.newArrayList(2, 4, 6, 8, 10);
  List<Integer> numbers2 = Lists.newArrayList(3, 5, 7);

  JavaPairRDD<Integer, Iterable<Integer>> numbers1RDD = CONTEXT.parallelize(numbers1, 2).groupBy(number -> number%2);
  JavaPairRDD<Integer, Iterable<Integer>> numbers2RDD = CONTEXT.parallelize(numbers2, 2).groupBy(number -> number%2);

  Map<Integer, Tuple2<Optional<Iterable<Integer>>, Iterable<Integer>>> joinedRDDMap =
    numbers1RDD.rightOuterJoin(numbers2RDD).collectAsMap();

  assertThat(joinedRDDMap).hasSize(1);
  assertThat(joinedRDDMap.get(1)._1().isPresent()).isFalse();
  assertThat(joinedRDDMap.get(1)._2()).containsOnly(3, 5, 7);
  Iterable<Integer> entries1 = Iterables.concat(joinedRDDMap.get(1)._1().or(new ArrayList<>()), joinedRDDMap.get(1)._2());
  assertThat(entries1).hasSize(3);
  assertThat(entries1).containsOnly(3, 5, 7);
}

@Test
public void should_correctly_apply_full_outer_join() {
  List<Integer> numbers1 = Lists.newArrayList(2, 4, 6, 8, 10);
  List<Integer> numbers2 = Lists.newArrayList(3, 5, 7);

  JavaPairRDD<Integer, Iterable<Integer>> numbers1RDD = CONTEXT.parallelize(numbers1, 2).groupBy(number -> number%2);
  JavaPairRDD<Integer, Iterable<Integer>> numbers2RDD = CONTEXT.parallelize(numbers2, 2).groupBy(number -> number%2);

  Map<Integer, Tuple2<Optional<Iterable<Integer>>, Optional<Iterable<Integer>>>> joinedRDDMap =
    numbers1RDD.fullOuterJoin(numbers2RDD).collectAsMap();

  assertThat(joinedRDDMap).hasSize(2);
  assertThat(joinedRDDMap.get(0)._1().get()).containsOnly(2, 4, 6, 8, 10);
  assertThat(joinedRDDMap.get(0)._2().isPresent()).isFalse();
  assertThat(joinedRDDMap.get(1)._1().isPresent()).isFalse();
  assertThat(joinedRDDMap.get(1)._2().get()).containsOnly(3, 5, 7);
}

@Test
public void should_correctly_reduce_values_by_key() {
  List<Tuple2<String, Integer>> owner1Animals = Lists.newArrayList(
    new Tuple2<>("cat", 2), new Tuple2<>("dog", 7), new Tuple2<>("duck", 11)
  );
  List<Tuple2<String, Integer>> owner2Animals = Lists.newArrayList(
    new Tuple2<>("cat", 2), new Tuple2<>("dog", 7), new Tuple2<>("rabbit", 1)
  );

  JavaPairRDD<String, Integer> owner1AnimalsRDD = CONTEXT.parallelizePairs(owner1Animals);
  JavaPairRDD<String, Integer> owner2AnimalsRDD = CONTEXT.parallelizePairs(owner2Animals);

  JavaPairRDD<String, Integer> subtractedAnimalsRDD = owner1AnimalsRDD.subtractByKey(owner2AnimalsRDD);

  Map<String, Integer> subtractedAnimalsMap = subtractedAnimalsRDD.collectAsMap();

  assertThat(subtractedAnimalsMap).hasSize(1);
  assertThat(subtractedAnimalsMap.get("duck")).isEqualTo(11);
}

@Test
public void should_correctly_group_data_by_the_key() {
  List<Tuple2<String, Integer>> owner1Animals = Lists.newArrayList(
    new Tuple2<>("cat", 2), new Tuple2<>("dog", 7), new Tuple2<>("duck", 11)
  );
  List<Tuple2<String, Integer>> owner2Animals = Lists.newArrayList(
    new Tuple2<>("cat", 3), new Tuple2<>("dog", 4), new Tuple2<>("rabbit", 1)
  );

  JavaPairRDD<String, Integer> owner1AnimalsRDD = CONTEXT.parallelizePairs(owner1Animals);
  JavaPairRDD<String, Integer> owner2AnimalsRDD = CONTEXT.parallelizePairs(owner2Animals);

  JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> groupedAnimalsRDD =
    owner1AnimalsRDD.cogroup(owner2AnimalsRDD);

  Map<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> resultMap = groupedAnimalsRDD.collectAsMap();

  assertThat(resultMap).hasSize(4);
  assertThat(resultMap).containsOnlyKeys("cat", "dog", "rabbit", "duck");
  assertThat(resultMap.get("cat")._1()).containsOnly(2);
  assertThat(resultMap.get("cat")._2()).containsOnly(3);
  assertThat(resultMap.get("dog")._1()).containsOnly(7);
  assertThat(resultMap.get("dog")._2()).containsOnly(4);
  assertThat(resultMap.get("rabbit")._1()).isEmpty();
  assertThat(resultMap.get("rabbit")._2()).containsOnly(1);
  assertThat(resultMap.get("duck")._1()).containsOnly(11);
  assertThat(resultMap.get("duck")._2()).isEmpty();
}

@Test
public void should_correctly_count_values_by_key() {
  List<Integer> numbers = Lists.newArrayList(2, 4, 6, 8, 10, 65, 52, 26, 11, 15, 25, 35, 45, 55);

  JavaPairRDD<Boolean, Integer> numbersDivisibleBy2RDD =
    CONTEXT.parallelize(numbers).mapToPair(n -> new Tuple2<>(n%2 == 0, n));

  JavaPairRDD<Boolean, Integer> theBiggestNumbers =
    numbersDivisibleBy2RDD.reduceByKey((value1, value2) -> value1 > value2 ? value1 : value2);

  Map<Boolean, Integer> theBiggestNumbersMap = theBiggestNumbers.collectAsMap();
  assertThat(theBiggestNumbersMap.get(true)).isEqualTo(52);
  assertThat(theBiggestNumbersMap.get(false)).isEqualTo(65);
}

@Test
public void should_get_values_fo_one_rdd() {
  List<Integer> numbers = Lists.newArrayList(2, 4, 6, 8, 10, 65, 52, 26, 11, 15, 25, 35, 45, 55, 55);

  JavaPairRDD<Boolean, Integer> numbersDivisibleBy2RDD =
    CONTEXT.parallelize(numbers).mapToPair(n -> new Tuple2<>(n%2 == 0, n));

  JavaRDD<Integer> allNumbersRDD = numbersDivisibleBy2RDD.values();

  List<Integer> allNumbers = allNumbersRDD.collect();
  assertThat(allNumbers).hasSize(numbers.size());
  assertThat(allNumbers).containsAll(numbers);
}

@Test
public void should_get_keys_fo_one_rdd() {
  List<Integer> numbers = Lists.newArrayList(2, 4, 6, 8, 10, 65, 52, 26, 11, 15, 25, 35, 45, 55);

  JavaPairRDD<Boolean, Integer> numbersDivisibleBy2RDD =
    CONTEXT.parallelize(numbers).mapToPair(n -> new Tuple2<>(n%2 == 0, n));

  JavaRDD<Boolean> allKeysRDD = numbersDivisibleBy2RDD.keys();

  List<Boolean> allKeys = allKeysRDD.collect();
  // keys can be duplicated
  assertThat(allKeys).hasSize(14);
  long trueValues = allKeys.stream().filter(value -> value).count();
  long falseValues = allKeys.stream().filter(value -> !value).count();
  assertThat(trueValues).isEqualTo(7);
  assertThat(falseValues).isEqualTo(7);
}

@Test
public void should_correctly_group_values_by_key() {
  List<Tuple2<Boolean, Integer>> primeNumbers = Lists.newArrayList(
    new Tuple2<>(true, 2), new Tuple2<>(true, 7), new Tuple2<>(false, 4), new Tuple2<>(true, 11)
  );

  JavaPairRDD<Boolean, Integer> primeNumbersRDD = CONTEXT.parallelizePairs(primeNumbers);

  JavaPairRDD<Boolean, Iterable<Integer>> groupedPrimeNumbersRDD = primeNumbersRDD.groupByKey();

  Map<Boolean, Iterable<Integer>> groupedRDD = groupedPrimeNumbersRDD.collectAsMap();
  assertThat(groupedRDD).hasSize(2);
  assertThat(groupedRDD.get(true)).containsOnly(2, 7, 11);
  assertThat(groupedRDD.get(false)).containsOnly(4);
}

@Test
public void should_correctly_aggregate_values_by_key() {
  List<Tuple2<Boolean, Integer>> primeNumbers = Lists.newArrayList(
    new Tuple2<>(true, 2), new Tuple2<>(true, 7), new Tuple2<>(false, 4), new Tuple2<>(true, 11)
  );

  // aggregateByKey aggregates values on each key. The first argument is the "zero" value,
  // the second one corresponds to make an operation on data held by RDD on 1 partition.
  // The last method is a combine function which merges data generated by the previous method on different
  // partitions.
  // Note that the third function is applied only when data is stored in 2 or more partitions and
  // only if at least 2 partitions aggregated results for given key.
  // Aggregate's advantage regarding to grouping by key is that it can return
  // different type than the one stored initially in transformed RDD.
  JavaPairRDD<Boolean, Integer> primeNumbersRDD = CONTEXT.parallelizePairs(primeNumbers, 2);
  JavaPairRDD<Boolean, String> aggregatedToPrintFormatRDD = primeNumbersRDD.aggregateByKey("",
          (String previousValue, Integer currentValue) -> {
              if (previousValue.equals("")) {
                  return "["+currentValue+"]";
              }
              return previousValue+", ["+currentValue+"]";
          },
          (String v1, String v2) -> v1 + ", " + v2);

  Map<Boolean, String> mappedPrimeNumbers = aggregatedToPrintFormatRDD.collectAsMap();
  assertThat(mappedPrimeNumbers).hasSize(2);
  assertThat(mappedPrimeNumbers.get(false)).isEqualTo("[4]");
  assertThat(mappedPrimeNumbers.get(true)).isEqualTo("[2], [7], [11]");
}

@Test
public void should_correctly_partition_data() throws InterruptedException {
  List<Integer> numbers = Lists.newArrayList(2, 4, 6, 8, 10, 65, 52, 26, 11, 15, 25, 35, 45, 55);

  JavaPairRDD<Boolean, Integer> numbersDivisibleBy2RDD =
    CONTEXT.parallelize(numbers).mapToPair(n -> new Tuple2<>(n%2 == 0, n));

  PartitionerTest partitioner = new PartitionerTest();
  JavaPairRDD<Boolean, Integer> partitionedRDD = numbersDivisibleBy2RDD.partitionBy(partitioner);

  partitionedRDD.count();
  assertThat(partitionedRDD.partitions()).hasSize(2);
  assertThat(partitionedRDD.partitions().get(0).index()).isEqualTo(0);
  assertThat(partitionedRDD.partitions().get(1).index()).isEqualTo(1);
}

@Test
public void should_correctly_sort_data_by_key() {
  List<Tuple2<Integer, String>> goalsAndPlayers = Arrays.asList(
    new Tuple2<>(30, "Player1"),
    new Tuple2<>(21, "Player2"),
    new Tuple2<>(35, "Player3"),
    new Tuple2<>(15, "Player4")
  );
  JavaPairRDD<Integer, String> goalsAndPlayersRDD =
    CONTEXT.parallelizePairs(goalsAndPlayers);

  JavaPairRDD<Integer, String> sortedGoalsAndPlayersRDD = goalsAndPlayersRDD.sortByKey(false);

  // Note that sorted results are collected only on collect or save call. For example, if you call
  // collectAsMap(), the output won't be sorted as expected
  List<Tuple2<Integer, String>> goalsAndPlayersMap = sortedGoalsAndPlayersRDD.collect();
  assertThat(goalsAndPlayersMap).hasSize(4);
  List<String> sorted = new ArrayList<>();
  goalsAndPlayersMap.forEach((t) -> sorted.add(t._1()+"_"+t._2()));
  assertThat(sorted).containsExactly("35_Player3", "30_Player1", "21_Player2", "15_Player4");
}

@Test
public void should_correctly_combine_by_key() {
  List<Integer> numbers = Lists.newArrayList(2, 4, 6, 8, 10, 65, 52, 26, 11, 15, 25, 35, 45, 55);

  JavaPairRDD<Boolean, Integer> numbersDivisibleBy2RDD =
      CONTEXT.parallelize(numbers, 2).mapToPair(n -> new Tuple2<>(n%2 == 0, n));

  // combine is similar operation to previously discovered (aggregation, reduce). It takes values
  // identified by keys and applies on them, first, combiner function. It can, as in our test,
  // change the type of original object from RDD to another type. The second function feeds element
  // crated by the previous function. The last function is applied when data is stored on
  // different partitions and we need to merge them.
  // Note that combineByKey works almost like groupByKey with some subtle and important
  // differences. combineByKey will combine data locally, on given partition and
  // shuffle across the network onl the result of this operation. In the other side,
  // groupyBy will send not grouped entries to one specific partition where grouping
  // will occur. By the way, it's the reason why merge combiner (the 3rd) function
  // exists.
  JavaPairRDD<Boolean, List<String>> combinedRDD = numbersDivisibleBy2RDD.combineByKey(
      number -> Lists.newArrayList(number.toString()),
      (List<String> numbersList, Integer number) -> {
        numbersList.add(number.toString());
        return numbersList;
      },
      (List<String> partition1Data, List<String> partition2Data) -> {
        partition1Data.addAll(partition2Data);
        return partition1Data;
      }
    );

  Map<Boolean, List<String>> result = combinedRDD.collectAsMap();

  assertThat(result).hasSize(2);
  assertThat(result.get(true)).containsOnly("2","4","6","8","10","52","26");
  assertThat(result.get(false)).containsOnly("65","11","15","25","35","45","55");
}

private static class PartitionerTest extends Partitioner implements Serializable {

  @Override
  public int numPartitions() {
    return 2;
  }

  @Override
  public int getPartition(Object key) {
    // True goes to the 1st partition, False to the 2nd
    return (boolean) key ? 0 : 1;
  }
}

The post shows main Spark's transformations. The first part describes transformations and shows, through non programming examples, what they do. The second part is constructed by a succession of test cases illustrating transformations use through Spark's Java API.

Share on: