Data transformations in Apache Beam

on waitingforcode.com

Data transformations in Apache Beam

Transformation are intrinsic part of each data processing framework. Apache Beam is not an exception and it also provides some of build-in transformations that can be freely extended with appropriated structures.

After the first post explaining PCollection in Apache Beam, this one focuses on operations we can do with this data abstraction. The first section describes the API of data transformations in Apache Beam. The next one lists main available transformations and illustrates each of them with one or more code samples.

Transforming PCollections

In order to understand the transformation API we'd first start by recall the previous data processing framework developed at Google - Apache Flume, internal successor of MapReduce. Its transformation API was based on expicitly called operations doing different processing stuff: mapping, filtering, counting and so on. However this approach had some drawbacks: code readibility, modulability, extendability or type safety. It's the reasons why the Apache Beam's programmers decided to adopt more universal approach - expose a single apply(PTransform<? super PCollection<T>, OutputT> t) method from PCollection class.

This method takes the executed transformation (map, filter, count....) as parameter. The transformation is defined as an instance of PTransform<InputT, OutputT> implementation. Simply speaking, PTransform takes some data of certain type and returns other data that may be of different type. Among these transformations we can distinguish:

  • input transformations - e.g. reading data from input file
  • processing transformation - e.g. mapping or filtering
  • output transformations - e.g. saving data to output sink

The transformations are identified by an unique name that can either be assigned explicitly through apply function call or randomly by the system. Some of data processing popular transformations (count, sum, min, max, map, filter...) are already included in Apache Beam. The missing ones can be simply provided as the implementations of PTransform.

Transformations examples

The following list contains some of built-in transformations provided by Apache Beam. Some of them are quite obvious, the others not and require more explaination:

  • any custom transformation - Beam's users can freely invoke any processing custom function applied on each of elements included in PCollection. To define so function a transformation represented by ParDo class can be used. Since the ParDo is a little bit less obvious to use as other transformations, it'll be described in separated post.
  • combine - this reduce transformation combines all elements in given window into a single value. For instance, it could be used to compute a sum of PCollection's values (if the sum transformation wasn't defined):
      @Test
      public void should_concat_all_words() {
        Pipeline pipeline = BeamFunctions.createPipeline("Combine transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("aa", "aa", "aaa")));
    
        PCollection<String> concatenatedWords = dataCollection.apply(Combine.globally(words -> {
          String concatenatedWord = "";
          for (String word : words) {
            concatenatedWord += word;
          }
          return concatenatedWord;
        }));
    
        // Concatenation order is not guaranteed. It depends on dataset splitting strategy.
        // It's why the same letters are concatenated.
        PAssert.that(concatenatedWords).containsInAnyOrder("aaaaaaa");
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_combine_all_orders_by_customer() {
        Pipeline pipeline = BeamFunctions.createPipeline("Combine per key transformation");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, Integer>> ordersSumsPerCustomer = customerOrders.apply(Combine.perKey(amounts -> {
          int sum = 0;
          for (int amount : amounts) {
            sum += amount;
          }
          return sum;
        }));
    
        PAssert.that(ordersSumsPerCustomer).containsInAnyOrder(KV.of("C#1", 719), KV.of("C#2", 558), KV.of("C#3", 120));
        pipeline.run().waitUntilFinish();
      }
      
  • count - counts the number of occurrences in given dataset. It contains also the variations for key-value pairs. The global count is defined through Count.globally() method while key-based through Count.perElement() one:
      @Test
      public void should_count_the_number_of_elements_in_collection() {
        Pipeline pipeline = BeamFunctions.createPipeline("Count transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
    
        PCollection<Long> allItemsCount = numbersCollection.apply(Count.globally());
    
        PAssert.that(allItemsCount).containsInAnyOrder(3L);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_count_the_occurrences_per_element_in_key_value_collection() {
        Pipeline pipeline = BeamFunctions.createPipeline("Count per element transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("", "a", "", "", "ab",
          "ab", "abc")));
    
        PCollection<KV<String, Long>> perElementCount = dataCollection.apply(Count.perElement());
    
        PAssert.that(perElementCount).containsInAnyOrder(KV.of("", 3L),
          KV.of("a", 1L), KV.of("ab", 2L), KV.of("abc", 1L));
        pipeline.run().waitUntilFinish();
      }
      
  • distinct - this operations helps to dedupe repeated entries in the processed dataset. The comparison is made not with Java's equals(Object object) method but on byte representation of the value encoded with provided coder instance.
    However in some cases the equality is based on 1 particular value. For instance it can be the case of key-value pairs. In such situation Beam provides a Distinct.withRepresentativeValueFn(SerializableFunction<T, IdT> fn) method that helps to define the logic of differentiation rule:
      @Test
      public void should_get_distinct_words() {
        Pipeline pipeline = BeamFunctions.createPipeline("Distinct transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("", "a", "", "", "ab",
          "ab", "abc")));
    
        PCollection<String> dataCollectionWithoutDuplicates = dataCollection.apply(Distinct.create());
    
        PAssert.that(dataCollectionWithoutDuplicates).containsInAnyOrder("", "a", "ab", "abc");
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_keep_only_one_pair() {
        Pipeline pipeline = BeamFunctions.createPipeline("Distinct with representative values for key-value pairs");
        PCollection<KV<Integer, String>> dataCollection = pipeline.apply(Create.of(Arrays.asList(
          KV.of(1, "a"), KV.of(2, "b"), KV.of(1, "a"), KV.of(10, "a")
        )));
    
        PCollection<KV<Integer, String>> distinctPairs = dataCollection.apply(
          Distinct.withRepresentativeValueFn(new SerializableFunction<KV<Integer, String>, String>() {
            @Override
            public String apply(KV<Integer, String> input) {
              return input.getValue();
            }
          }));
    
        PAssert.that(distinctPairs).containsInAnyOrder(KV.of(1, "a"), KV.of(2, "b"));
        pipeline.run().waitUntilFinish();
      }
      
  • filter - helps to remove not matching elements of PCollection. The filtering method can be activated with one of org.apache.beam.sdk.transforms.Filter factory methods: by (custom filter being an implementation of SerializableFunction), equal (equality filter), less/greater than or equal:
      @Test
      public void should_filter_empty_words() {
        Pipeline pipeline = BeamFunctions.createPipeline("Empty words filter");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("", "a", "", "", "ab",
          "ab", "abc")));
    
        PCollection<String> notEmptyWords =
          dataCollection.apply(Filter.by(Filters.NOT_EMPTY));
    
        PAssert.that(notEmptyWords).containsInAnyOrder(Arrays.asList("a", "ab", "ab", "abc"));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_take_only_a_letters()  {
        Pipeline pipeline = BeamFunctions.createPipeline("'a' letters filter");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("a", "b", "c", "a", "d", "a")));
    
        PCollection<String> aLetters = dataCollection.apply(Filter.equal("a"));
    
        PAssert.that(aLetters).containsInAnyOrder(Arrays.asList("a", "a", "a"));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_keep_numbers_greater_or_equal_to_2() {
        Pipeline pipeline = BeamFunctions.createPipeline("Numbers greater or equal to 2 filter");
        PCollection<Integer> dataCollection = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
    
        PCollection<Integer> numbersGreaterOrEqual2 = dataCollection.apply(Filter.greaterThanEq(2));
    
        PAssert.that(numbersGreaterOrEqual2).containsInAnyOrder(Arrays.asList(2, 3));
        pipeline.run().waitUntilFinish();
      }
      
      enum Filters implements SerializableFunction<String, Boolean> {
        NOT_EMPTY {
          @Override
          public Boolean apply(String input) {
            return !input.isEmpty();
          }
        }
      }
      
  • flat map - it's a mapping function that converts input to one or more elements merged at the end into a single PCollection type:
      @Test
      public void should_flat_map_numbers() {
        Pipeline pipeline = BeamFunctions.createPipeline("FlatMap transformation");
        PCollection<Integer> numbers = pipeline.apply(Create.of(1, 10, 100));
    
        PCollection<Integer> flattenNumbers = numbers.apply(FlatMapElements.into(TypeDescriptors.integers())
          .via(new Multiplicator(2)));
    
        PAssert.that(flattenNumbers).containsInAnyOrder(1, 2, 10, 20, 100, 200);
        pipeline.run().waitUntilFinish();
      }
      
      class Multiplicator implements SerializableFunction<Integer, Collection<Integer>> {
    
        private int factor;
    
        public Multiplicator(int factor) {
          this.factor = factor;
        }
    
        @Override
        public Collection apply(Integer number) {
          return Arrays.asList(number, factor*number);
        }
      }
      
  • flatten - here the transformations takes an iterable input and transforms it to its flatten representation, i.e. instead of returning a collection of iterable items it returns a collection of simple items. As shown below it applies as well to collection classes as for special kind of chained PCollection (PCollectionList):
      @Test
      public void should_flatten_numbers() {
        Pipeline pipeline = BeamFunctions.createPipeline("Flatten transformation");
        PCollection<List<Integer>> numbersFromList = pipeline.apply(Create.of(
            Arrays.asList(1, 2, 3, 4), Arrays.asList(10, 11, 12, 13), Arrays.asList(20, 21, 22, 23)
        ));
    
        PCollection<Integer> flattenNumbers = numbersFromList.apply(Flatten.iterables());
    
        PAssert.that(flattenNumbers).containsInAnyOrder(1, 2, 3, 4, 10, 11, 12, 13, 20, 21, 22, 23);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_flatten_numbers_from_list_pcollection() {
        Pipeline pipeline = BeamFunctions.createPipeline("Flatten transformation");
        PCollection<Integer> numbers1 = pipeline.apply(Create.of(1, 2, 3, 4));
        PCollection<Integer> numbers2 = pipeline.apply(Create.of(10, 11, 12, 13));
        PCollection<Integer> numbers3 = pipeline.apply(Create.of(20, 21, 22, 23));
        PCollectionList<Integer> numbersList = PCollectionList.of(numbers1).and(numbers2).and(numbers3);
    
        PCollection<Integer> flattenNumbers = numbersList.apply(Flatten.pCollections());
    
        PAssert.that(flattenNumbers).containsInAnyOrder(1, 2, 3, 4, 10, 11, 12, 13, 20, 21, 22, 23);
        pipeline.run().waitUntilFinish();
      }
      
  • grouping - here all key-value pairs for the same key in one window are grouped together. It's an equivalent of shuffle operation in MapReduce. Thus if the elements of one key aren't located in the same node, all values are moved to a single place. As for distinct, the comparison is not made with Java's equals method but with encoded representation.
    A variation of classical grouping is the join represented by CoGroupByKey transformation. It simply groups the elements of different PCollections together, per key (= as SQL join):
      @Test
      public void should_group_orders_by_customer() {
        Pipeline pipeline = BeamFunctions.createPipeline("Group by key transformation");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, Iterable<Integer>>> groupedOrders = customerOrders.apply(GroupByKey.create());
    
        PAssert.that(groupedOrders).satisfies(input -> {
          Map<String, List<Integer>> expected = new HashMap<>();
          expected.put("C#1", Arrays.asList(210, 200, 209, 100));
          expected.put("C#2", Arrays.asList(108, 450));
          expected.put("C#3", Arrays.asList(120));
          for (KV<String, Iterable<Integer>> keyValues : input) {
            List<Integer> expectedOrderAmounts = expected.get(keyValues.getKey());
            assertThat(keyValues.getValue()).containsOnlyElementsOf(expectedOrderAmounts);
          }
          return null;
        });
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_join_the_elements_of_2_collections() {
        Pipeline pipeline = BeamFunctions.createPipeline("Group by key transformation");
        PCollection<KV<String, Integer>> elements1 = pipeline.apply(Create.of(
          KV.of("A", 1), KV.of("B", 10), KV.of("A", 5), KV.of("A", 3), KV.of("B", 11)
        ));
        PCollection<KV<String, Integer>> elements2 = pipeline.apply(Create.of(
          KV.of("A", 6), KV.of("B", 12), KV.of("A", 4), KV.of("A", 2), KV.of("C", 20)
        ));
        TupleTag<Integer> tupleTag1 = new TupleTag<>();
        TupleTag<Integer> tupleTag2 = new TupleTag<>();
    
        PCollection<KV<String, CoGbkResult>> coGroupedElements =
          KeyedPCollectionTuple.of(tupleTag1, elements1).and(tupleTag2, elements2).apply(CoGroupByKey.create());
    
    
        PAssert.that(coGroupedElements).satisfies(input -> {
          Map<String, List<Integer>> expected = new HashMap<>();
          expected.put("A", Arrays.asList(1, 2, 3, 4, 5, 6));
          expected.put("B", Arrays.asList(10, 11, 12));
          expected.put("C", Arrays.asList(20));
          for (KV<String, CoGbkResult> result : input) {
            Iterable<Integer> allFrom1 = result.getValue().getAll(tupleTag1);
            Iterable<Integer> allFrom2 = result.getValue().getAll(tupleTag2);
            Iterable<Integer> groupedValues = Iterables.concat(allFrom1, allFrom2);
            assertThat(groupedValues).containsOnlyElementsOf(expected.get(result.getKey()));
          }
          return null;
        });
        pipeline.run().waitUntilFinish();
      }
      
  • keys - as for values, except it returns the keys:
      @Test
      public void should_retrieve_keys_of_key_value_pairs() {
        Pipeline pipeline = BeamFunctions.createPipeline("Amounts keys");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<String> customerCodes = customerOrders.apply(Keys.create());
    
        PAssert.that(customerCodes).containsInAnyOrder("C#1", "C#1", "C#1", "C#1", "C#2", "C#2", "C#3");
        pipeline.run().waitUntilFinish();
      }
      
  • map - converts the input elements to output elements by keeping or not the type of elements. The mapping can be defined with org.apache.beam.sdk.transforms.MapElements transformation. It defines 2 useful methods: into (defines the type of output values) and via (SerializableFunction generating the output values):
      @Test
      public void should_map_words_into_their_length() {
        Pipeline pipeline = BeamFunctions.createPipeline("Mapping transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("", "a", "", "", "ab",
          "ab", "abc")));
    
        PCollection<Integer> wordsLengths =
          dataCollection.apply(MapElements.into(TypeDescriptors.integers()).via(word -> word.length()));
    
        PAssert.that(wordsLengths).containsInAnyOrder(Arrays.asList(0, 0, 0, 1, 2, 2, 3));
        pipeline.run().waitUntilFinish();
      }
      
  • mean - this transformation is used to compute a mean of some values. As min/max transformations, the mean also offers 2 options: global or per key:
      @Test
      public void should_get_mean_value_of_numeric_values() {
        Pipeline pipeline = BeamFunctions.createPipeline("Mean value transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5,
          6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)));
    
        PCollection<Double> meanValue = numbersCollection.apply(Mean.globally());
    
        PAssert.that(meanValue).containsInAnyOrder(10.5d);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_compute_mean_per_key() {
        Pipeline pipeline = BeamFunctions.createPipeline("Mean value transformation");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, Double>> meanAmountPerCustomer = customerOrders.apply(Mean.perKey());
    
        PAssert.that(meanAmountPerCustomer).containsInAnyOrder(KV.of("C#1", 179.75d), KV.of("C#2", 279d),
          KV.of("C#3", 120d));
        pipeline.run().waitUntilFinish();
      }
      
  • min/max - these 2 transformations are very similar since they help to obtain minimal or maximal value in given dataset. Both of them have the same variants: global (min/max value in whole dataset), per key (min/max value per key) or custom (min/max value computed with the help of defined Comparator):
      @Test
      public void should_get_min_value() {
        Pipeline pipeline = BeamFunctions.createPipeline("Min value transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
    
        PCollection<Integer> minValue = numbersCollection.apply(Min.globally());
    
        PAssert.that(minValue).containsInAnyOrder(1);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_min_value_per_customer_key() {
        Pipeline pipeline = BeamFunctions.createPipeline("Min value transformation");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, Integer>> minCustomersAmount = customerOrders.apply(Min.perKey());
    
        PAssert.that(minCustomersAmount).containsInAnyOrder(KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_min_value_with_custom_comparator() {
        Pipeline pipeline = BeamFunctions.createPipeline("Min value with custom comparator transformation");
        PCollection<Integer> numbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6)));
    
        PCollection<Integer> minAndEven = numbers.apply(Min.globally(Comparators.LOWER_AND_EVEN));
    
        PAssert.that(minAndEven).containsInAnyOrder(2);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_max_value() {
        Pipeline pipeline = BeamFunctions.createPipeline("Max value transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6)));
    
        PCollection<Integer> maxValue = numbersCollection.apply(Max.globally());
    
        PAssert.that(maxValue).containsInAnyOrder(6);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_max_value_per_customer_key() {
        Pipeline pipeline = BeamFunctions.createPipeline("Max value transformation");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, Integer>> maxCustomerAmounts = customerOrders.apply(Max.perKey());
    
        PAssert.that(maxCustomerAmounts).containsInAnyOrder(KV.of("C#1", 210), KV.of("C#2", 450), KV.of("C#3", 120));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_max_value_with_custom_comparator() {
        Pipeline pipeline = BeamFunctions.createPipeline("Max value with custom comparator transformation");
        PCollection<Integer> numbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6, 7)));
    
        PCollection<Integer> maxAndEven = numbers.apply(Max.globally(Comparators.BIGGER_AND_EVEN));
    
        PAssert.that(maxAndEven).containsInAnyOrder(6);
        pipeline.run().waitUntilFinish();
      }
      
      enum Comparators implements Comparator<Integer> {
        BIGGER_AND_EVEN {
          @Override
          public int compare(Integer comparedNumber, Integer toCompare) {
            boolean isEvenCompared = comparedNumber%2 == 0;
            boolean isEvenToCompare = toCompare%2 == 0;
            if (isEvenCompared && !isEvenToCompare) {
              return 1;
            } else if (!isEvenCompared) {
              return -1;
            } else {
              return comparedNumber > toCompare ? 1 : -1;
            }
          }
        },
        LOWER_AND_EVEN {
          @Override
          public int compare(Integer comparedNumber, Integer toCompare) {
            boolean isEvenCompared = comparedNumber%2 == 0;
            boolean isEvenToCompare = toCompare%2 == 0;
            if (isEvenCompared && !isEvenToCompare) {
              return -1;
            } else if (!isEvenCompared) {
              return 1;
            } else {
              return comparedNumber > toCompare ? 1 : -1;
            }
          }
        }
      }
      
  • partition - defines the function used to split the items of one PCollection to different partitions:
      @Test
      public void should_partition_numbers_to_10_equal_partitions() {
        Pipeline pipeline = BeamFunctions.createPipeline("Partitioning transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(
          IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList())));
    
        PCollectionList<Integer> repartitionedNumbers = numbersCollection.apply(Partition.of(4,
          (Partition.PartitionFn<Integer>) (element, numPartitions) -> element % numPartitions));
    
        PAssert.that(repartitionedNumbers.get(0)).containsInAnyOrder(4, 8, 12, 16, 20);
        PAssert.that(repartitionedNumbers.get(1)).containsInAnyOrder(1, 5, 9, 13, 17);
        PAssert.that(repartitionedNumbers.get(2)).containsInAnyOrder(2, 6, 10, 14, 18);
        PAssert.that(repartitionedNumbers.get(3)).containsInAnyOrder(3, 7, 11, 15, 19);
    
        pipeline.run().waitUntilFinish();
      }
      
  • RegEx - Beam introduces Regular Expressions to the group of transformations. Thanks to built-in RegEx transformations we can easily: split some text, replace all or only first characters in given input or extract some pattern from the data (find method). The transformations apply on single values but its variant, suffixed by *KV, is able convert extracted 2 values (RegEx groups) to key-value pairs:
      @Test
      public void should_find_1_occurrence_of_regex_expression() {
        Pipeline pipeline = BeamFunctions.createPipeline("RegEx find transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("", "a", "", "", "ab",
          "ab", "abc")));
    
        PCollection<String> foundWords = dataCollection.apply(Regex.find("(ab)"));
    
        PAssert.that(foundWords).containsInAnyOrder("ab", "ab", "ab");
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_find_1_occurrence_of_regex_expression_per_group() {
        Pipeline pipeline = BeamFunctions.createPipeline("RegEx find in group transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(
          "aa ab c", "ab bb c", "ab cc d", "dada"
        ));
    
        PCollection<KV<String, String>> foundWords = dataCollection.apply(Regex.findKV("(ab) (c)", 1, 2));
    
        PAssert.that(foundWords).containsInAnyOrder(KV.of("ab", "c"), KV.of("ab", "c"));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_replace_all_a_by_1() {
        Pipeline pipeline = BeamFunctions.createPipeline("RegEx replaceAll transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("aa", "aba", "baba")));
    
        PCollection<String> replacedWords = dataCollection.apply(Regex.replaceAll("a", "1"));
    
        PAssert.that(replacedWords).containsInAnyOrder("11", "1b1", "b1b1");
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_replace_first_match() {
        Pipeline pipeline = BeamFunctions.createPipeline("RegEx replaceAll transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("aa", "aaaa", "aba")));
    
        PCollection<String> replacedWords = dataCollection.apply(Regex.replaceFirst("a", "1"));
    
        PAssert.that(replacedWords).containsInAnyOrder("1a", "1aaa", "1ba");
        pipeline.run().waitUntilFinish();
      }
    
    
      @Test
      public void should_split_sentences_by_whitespace() {
        Pipeline pipeline = BeamFunctions.createPipeline("RegEx replaceAll transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("aa bb cc", "aaaa aa cc", "aba bab")));
    
        PCollection<String> splittedWords = dataCollection.apply(Regex.split("\\s"));
    
        PAssert.that(splittedWords).containsInAnyOrder("aa", "bb", "cc", "aaaa", "aa", "cc", "aba", "bab");
        pipeline.run().waitUntilFinish();
      }
      
  • sample - randomly and uniformly selects n elements. The operation doesn't run in parallel and all sampled elements should fit into the memory of a single worker node:
      @Test
      public void should_sample_3_numbers() {
        Pipeline pipeline = BeamFunctions.createPipeline("Sample transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(IntStream.rangeClosed(1, 20)
          .boxed().collect(Collectors.toList())));
    
        PCollection<Iterable<Integer>> sampledNumbers = numbersCollection.apply(Sample.fixedSizeGlobally(3));
    
        PAssert.that(sampledNumbers).satisfies(input -> {
          Set<Integer> distinctNumbers = new HashSet<>();
          for (Iterable<Integer> ints : input) {
            for (int number : ints) {
              distinctNumbers.add(number);
            }
          }
          assertThat(distinctNumbers).hasSize(3);
          return null;
        });
        pipeline.run().waitUntilFinish();
      }
      
  • sum - as the name indicates, it computes the sum of all elements either globally or per key:
      @Test
      public void should_get_sum_value_of_numeric_values() {
        Pipeline pipeline = BeamFunctions.createPipeline("Sum value transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(Arrays.asList(1, 2, 3));
    
        PCollection<Integer> integersSum = numbersCollection.apply(Sum.integersGlobally());
    
        PAssert.that(integersSum).containsInAnyOrder(6);
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_sum_value_per_key() {
        Pipeline pipeline = BeamFunctions.createPipeline("Sum value per key");
        PCollection<KV<String, Integer>> numbersCollection = pipeline.apply(Create.of(
                KV.of("A", 100), KV.of("A", 200), KV.of("B", 150), KV.of("A", 100)
        ));
    
        PCollection<KV<String, Integer>> keyedSum = numbersCollection.apply(Sum.integersPerKey());
    
        PAssert.that(keyedSum).containsInAnyOrder(KV.of("A", 400), KV.of("B", 150));
        pipeline.run().waitUntilFinish();
      }
      
  • timestamping - the elements included in PCollection are timestamped with the reading time. However, this value specified by Reader object can be overridden with org.apache.beam.sdk.transforms.WithTimestamps transformation:
      @Test
      public void should_apply_timestamp_to_input_elements() {
        Pipeline pipeline = BeamFunctions.createPipeline("With timestamp transformation");
        PCollection<String> dataCollection = pipeline.apply(Create.of(Arrays.asList("a", "b")));
    
        Instant timestampToApply = Instant.now().minus(2 * 60 * 1000);
        PCollection<String> itemsWithNewTimestamp = dataCollection
          .apply(WithTimestamps.of(input -> timestampToApply));
        PCollection<Long> elementsTimestamps = itemsWithNewTimestamp.apply(ParDo.of(new DoFn<String, Long>() {
          @ProcessElement
          public void processElement(ProcessContext processContext) {
            processContext.output(processContext.timestamp().getMillis());
          }
        }));
    
        PAssert.that(elementsTimestamps).containsInAnyOrder(timestampToApply.getMillis(), timestampToApply.getMillis());
        pipeline.run().waitUntilFinish();
      }
      
  • top - gets the n items from the head of the tail of PCollection. It allows to get these objects globally or per key. Both operations use comparators to sort the elements. Custom comparators are also allowed in this processing:
      @Test
      public void should_get_the_first_2_elements() {
        Pipeline pipeline = BeamFunctions.createPipeline("Top 2 transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(IntStream.rangeClosed(1, 20)
          .boxed().collect(Collectors.toList())));
    
        PCollection<List<Integer>> top2Items = numbersCollection.apply(Top.largest(2));
    
        PAssert.that(top2Items).containsInAnyOrder(Arrays.asList(20, 19));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_the_last_2_elements() {
        Pipeline pipeline = BeamFunctions.createPipeline("Top 2 reversed transformation");
        PCollection<Integer> numbersCollection = pipeline.apply(Create.of(IntStream.rangeClosed(1, 20)
          .boxed().collect(Collectors.toList())));
    
        PCollection<List<Integer>> top2Items = numbersCollection.apply(Top.smallest(2));
    
        PAssert.that(top2Items).containsInAnyOrder(Arrays.asList(1, 2));
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_2_first_values_per_key() {
        Pipeline pipeline = BeamFunctions.createPipeline("Top 2 per key transformation");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, List<Integer>>> top2Orders = customerOrders.apply(Top.largestPerKey(2));
    
        PAssert.that(top2Orders).containsInAnyOrder(KV.of("C#1", Arrays.asList(210, 209)),
          KV.of("C#2", Arrays.asList(450, 108)),  KV.of("C#3", Collections.singletonList(120))
        );
        pipeline.run().waitUntilFinish();
      }
    
      @Test
      public void should_get_the_first_value_with_custom_comparator() {
        Pipeline pipeline = BeamFunctions.createPipeline("Top 1 custom comparator");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<KV<String, List<Integer>>> topEvenAmounts =
          customerOrders.apply(Top.perKey(1, Comparators.BIGGER_AND_EVEN));
    
        PAssert.that(topEvenAmounts).containsInAnyOrder(KV.of("C#1", Arrays.asList(210)),
          KV.of("C#2", Arrays.asList(450)),  KV.of("C#3", Collections.singletonList(120))
        );
        pipeline.run().waitUntilFinish();
      }
      
  • values - used only for key-value PCollections, it gets the valuess of the collection's items:
      @Test
      public void should_retrieve_values_of_key_value_pairs() {
        Pipeline pipeline = BeamFunctions.createPipeline("Amounts values");
        PCollection<KV<String, Integer>> customerOrders = pipeline.apply(Create.of(Arrays.asList(
          KV.of("C#1", 100), KV.of("C#2", 108), KV.of("C#3", 120), KV.of("C#1", 209), KV.of("C#1", 210),
          KV.of("C#1", 200), KV.of("C#2", 450))));
    
        PCollection<Integer> amounts = customerOrders.apply(Values.create());
    
        PAssert.that(amounts).containsInAnyOrder(100, 108, 120, 209, 210, 200, 450);
        pipeline.run().waitUntilFinish();
      }
      

This post is the continuity of previous articles about Apache Beam. This time it presents what we can do with our distributed data. The first section presented the API of data processing. We could learn that Apache Beam defines universal manner of processing methods by exposing a single one method called apply. The second part shown however that the framework still has some built-in common transformations, as counting, min/max, mapping, filtering or grouping.

Read also about Data transformations in Apache Beam here: Where's my PCollection.map()? .

Share, like or comment this post on Twitter: