PCollection - data representation in Apache Beam

on waitingforcode.com

PCollection - data representation in Apache Beam

One of the problems with data processing frameworks released in the past few years was the use of different abstractions for batch and streaming tasks. Apache Beam is an exception of this rule because it proposes a uniform data representation called PCollection.

In the first section we'll see the theoretical points about PCollection. Only the second one will show how to work (create, manipulate) on Beam's data abstraction in 2 conditions: batch and streaming.

PCollection explained

PCollection<T> is the data abstraction used in Apache Beam. Its type corresponds to the type of the values stored inside. The whole class can be described in following points:

  • immutable - the data stored in one PCollection can't be modified. If some transformation is applied on it, a new PCollection instance is created.
  • distributed - the PCollection represents the data stored in distributed dataset, that is the parts of whole data resides on different processing nodes.
  • boundary(less) - PCollection is a data abstraction shared by stream and batch processing. It means that its size can be unbounded or bounded.
  • windowed - every PCollection, independently on boundaries (bounded/unbounded), has its own associated windows. In the case of batch data, the associated default window is called global window. Beside it, other window types exist and they'll be covered in one of next posts.
  • universal - one PCollection can be an input for a lot of different transformations.
  • driven by the pipeline - Beam's Pipeline is a DAG with all defined transformations. PCollection is the result of transformations added to the execution Pipeline.
  • coder - PCollection is associated with an object called Coder, responsible for encoding and decoding PCollection values into byte stream.
  • timestamped - all elements from PCollection have an associated timestamp representing reading time. It's used later to evaluate if the data arrives in late, especially in streaming processing.
  • physical or volatile - PCollection can be built either on physical data (e.g. stored on disk) or on volatile data (e.g. stored on RAM memory). For the first case we consider that the PCollection is read while for the second that it's created.
  • value or key-value - a PCollection can either represent values or key-value pairs. The second representation enables the transformations per key basis, e.g. grouping or counting.
  • views - PCollection can be transformed to the structure called view. This structure itself can be latter injected as side input to the processing methods. The notion of side input will be explained more in details in further posts. Now we can simply consider it as an additional object used with input PCollection in the transformation (similarly to broadcast variables in Spark).
  • collections - multiple PCollections can be mixed. For instance, they can compose a structure called PCollectionList. It can be used as the input or output for certain operations: partition or flattening.
    The other collection-like structure is PCollectionTuple. It differs from PCollectionList because it can handle heterogeneous types, for instance numbers, strings or collections. It's useful in the case when a transformation takes or produces multiple PCollection of different types.

Above points are sufficient for now. Further posts will focus on other important points as side inputs or pipeline execution.

PCollection examples

Some of the points listed above can be proven through the learning tests:

private static final String FILE_1 = "/tmp/beam/1";
private static final String FILE_2 = "/tmp/beam/2";

@BeforeClass
public static void writeFiles() throws IOException {
  FileUtils.writeStringToFile(new File(FILE_1), "1\n2\n3\n4", "UTF-8");
  FileUtils.writeStringToFile(new File(FILE_2), "5\n6\n7\n8", "UTF-8");
}

@AfterClass
public static void deleteFiles() {
  FileUtils.deleteQuietly(new File(FILE_1));
  FileUtils.deleteQuietly(new File(FILE_2));
}

@Test
public void should_construct_pcollection_from_memory_objects() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from memory");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_construct_pcollection_without_applying_transformation_on_it() {
  Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from file");

  TextIO.Read reader = TextIO.read().from("/tmp/beam/*");
  PCollection<String> readNumbers = pipeline.apply(reader);

  PAssert.that(readNumbers).containsInAnyOrder("1", "2", "3", "4", "5", "6", "7", "8");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_not_modify_input_pcollection_after_applying_the_transformation() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Transforming a PCollection");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a"));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  PAssert.that(aLetters).containsInAnyOrder("a");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_use_one_pcollection_as_input_for_different_transformations() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Using one PCollection as input for different transformations");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a"));
  PCollection<String> notALetter = lettersCollection.apply(Filter.greaterThan("a"));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  PAssert.that(aLetters).containsInAnyOrder("a");
  PAssert.that(notALetter).containsInAnyOrder("b", "c", "d");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_create_explicitly_timestamped_batch_pcollection_with_custom_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Creating timestamped collection");
  PCollection<Integer> timestampedNumbers = pipeline.apply(Create.timestamped(
    TimestampedValue.of(1, new Instant(1)),
    TimestampedValue.of(2, new Instant(2)),
    TimestampedValue.of(3, new Instant(3)),
    TimestampedValue.of(4, new Instant(4))
  ));

  PCollection<String> mappedResult = timestampedNumbers.apply(
    Window.into(FixedWindows.of(new Duration(1)))
  ).apply(ParDo.of(new DoFn<Integer, String>() {
    @ProcessElement
    public void processElement(ProcessContext processingContext, BoundedWindow window) {
      processingContext.output(processingContext.element() + "(" + processingContext.timestamp() + ")" +
        " window="+window
      );
    }
  }));

  PAssert.that(mappedResult).containsInAnyOrder(
    "1(1970-01-01T00:00:00.001Z) window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.002Z)",
    "2(1970-01-01T00:00:00.002Z) window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.003Z)",
    "3(1970-01-01T00:00:00.003Z) window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.004Z)",
    "4(1970-01-01T00:00:00.004Z) window=[1970-01-01T00:00:00.004Z..1970-01-01T00:00:00.005Z)");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_get_pcollection_coder() {
  List letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection coder");

  PCollection lettersCollection = pipeline.apply(Create.of(letters));
  pipeline.run().waitUntilFinish();

  Coder lettersCoder = lettersCollection.getCoder();
  assertThat(lettersCoder.getClass()).isEqualTo(StringUtf8Coder.class);
}

@Test
public void should_get_pcollection_metadata() {
  List letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection metadata");

  PCollection<String> lettersCollection = pipeline.apply("A-B-C-D letters", Create.of(letters));
  pipeline.run().waitUntilFinish();

  assertThat(lettersCollection.isBounded()).isEqualTo(PCollection.IsBounded.BOUNDED);
  WindowingStrategy<?, ?> windowingStrategy = lettersCollection.getWindowingStrategy();
  assertThat(windowingStrategy.getWindowFn().getClass()).isEqualTo(GlobalWindows.class);
  assertThat(lettersCollection.getName()).isEqualTo("A-B-C-D letters/Read(CreateSource).out");
}


@Test
public void should_create_pcollection_list() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection list");
  PCollection<String> letters1 = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));
  PCollection<String> letters2 = pipeline.apply(Create.of(Arrays.asList("d", "e", "f")));
  PCollection<String> letters3 = pipeline.apply(Create.of(Arrays.asList("g", "h", "i")));


  PCollectionList<String> allLetters = PCollectionList.of(letters1).and(letters2).and(letters3);
  List> lettersCollections = allLetters.getAll();

  PAssert.that(lettersCollections.get(0)).containsInAnyOrder("a", "b", "c");
  PAssert.that(lettersCollections.get(1)).containsInAnyOrder("d", "e", "f");
  PAssert.that(lettersCollections.get(2)).containsInAnyOrder("g", "h", "i");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_create_pcollection_tuple() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection tuple");
  PCollection<String> letters = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));
  PCollection<Integer> numbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
  PCollection<Boolean> flags = pipeline.apply(Create.of(Arrays.asList(true, false, true)));
  TupleTag<String> lettersTag = new TupleTag<>();
  TupleTag<Integer> numbersTag = new TupleTag<>();
  TupleTag<Boolean> flagsTag = new TupleTag<>();

  PCollectionTuple mixedData = PCollectionTuple.of(lettersTag, letters).and(numbersTag, numbers).and(flagsTag, flags);
  Map<TupleTag<?>, PCollection<?>> tupleData = mixedData.getAll();

  PAssert.that((PCollection<String>)tupleData.get(lettersTag)).containsInAnyOrder("a", "b", "c");
  PAssert.that((PCollection<Integer>)tupleData.get(numbersTag)).containsInAnyOrder(1, 2, 3);
  PAssert.that((PCollection<Boolean>)tupleData.get(flagsTag)).containsInAnyOrder(true, false, true);
  pipeline.run().waitUntilFinish();
}

This post introduces the first element of Apache Beam framework - data representation called PCollection. The first section made an insight on the most important points of this object - immutability, distributed character and windowing. The second part shown these points in the code.

Share, like or comment this post on Twitter: