PCollection - data representation in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

One of the problems with data processing frameworks released in the past few years was the use of different abstractions for batch and streaming tasks. Apache Beam is an exception of this rule because it proposes a uniform data representation called PCollection.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

๐Ÿ‘‰ I'm interested in improving my data engineering skillset

In the first section we'll see the theoretical points about PCollection. Only the second one will show how to work (create, manipulate) on Beam's data abstraction in 2 conditions: batch and streaming.

PCollection explained

PCollection<T> is the data abstraction used in Apache Beam. Its type corresponds to the type of the values stored inside. The whole class can be described in following points:

Above points are sufficient for now. Further posts will focus on other important points as side inputs or pipeline execution.

PCollection examples

Some of the points listed above can be proven through the learning tests:

private static final String FILE_1 = "/tmp/beam/1";
private static final String FILE_2 = "/tmp/beam/2";

@BeforeClass
public static void writeFiles() throws IOException {
  FileUtils.writeStringToFile(new File(FILE_1), "1\n2\n3\n4", "UTF-8");
  FileUtils.writeStringToFile(new File(FILE_2), "5\n6\n7\n8", "UTF-8");
}

@AfterClass
public static void deleteFiles() {
  FileUtils.deleteQuietly(new File(FILE_1));
  FileUtils.deleteQuietly(new File(FILE_2));
}

@Test
public void should_construct_pcollection_from_memory_objects() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from memory");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_construct_pcollection_without_applying_transformation_on_it() {
  Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from file");

  TextIO.Read reader = TextIO.read().from("/tmp/beam/*");
  PCollection<String> readNumbers = pipeline.apply(reader);

  PAssert.that(readNumbers).containsInAnyOrder("1", "2", "3", "4", "5", "6", "7", "8");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_not_modify_input_pcollection_after_applying_the_transformation() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Transforming a PCollection");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a"));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  PAssert.that(aLetters).containsInAnyOrder("a");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_use_one_pcollection_as_input_for_different_transformations() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Using one PCollection as input for different transformations");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a"));
  PCollection<String> notALetter = lettersCollection.apply(Filter.greaterThan("a"));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  PAssert.that(aLetters).containsInAnyOrder("a");
  PAssert.that(notALetter).containsInAnyOrder("b", "c", "d");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_create_explicitly_timestamped_batch_pcollection_with_custom_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Creating timestamped collection");
  PCollection<Integer> timestampedNumbers = pipeline.apply(Create.timestamped(
    TimestampedValue.of(1, new Instant(1)),
    TimestampedValue.of(2, new Instant(2)),
    TimestampedValue.of(3, new Instant(3)),
    TimestampedValue.of(4, new Instant(4))
  ));

  PCollection<String> mappedResult = timestampedNumbers.apply(
    Window.into(FixedWindows.of(new Duration(1)))
  ).apply(ParDo.of(new DoFn<Integer, String>() {
    @ProcessElement
    public void processElement(ProcessContext processingContext, BoundedWindow window) {
      processingContext.output(processingContext.element() + "(" + processingContext.timestamp() + ")" +
        " window="+window
      );
    }
  }));

  PAssert.that(mappedResult).containsInAnyOrder(
    "1(1970-01-01T00:00:00.001Z) window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.002Z)",
    "2(1970-01-01T00:00:00.002Z) window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.003Z)",
    "3(1970-01-01T00:00:00.003Z) window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.004Z)",
    "4(1970-01-01T00:00:00.004Z) window=[1970-01-01T00:00:00.004Z..1970-01-01T00:00:00.005Z)");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_get_pcollection_coder() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection coder");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  pipeline.run().waitUntilFinish();

  Coder<String> lettersCoder = lettersCollection.getCoder();
  assertThat(lettersCoder.getClass()).isEqualTo(StringUtf8Coder.class);
}

@Test
public void should_get_pcollection_metadata() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection metadata");

  PCollection<String> lettersCollection = pipeline.apply("A-B-C-D letters", Create.of(letters));
  pipeline.run().waitUntilFinish();

  assertThat(lettersCollection.isBounded()).isEqualTo(PCollection.IsBounded.BOUNDED);
  WindowingStrategy<?, ?> windowingStrategy = lettersCollection.getWindowingStrategy();
  assertThat(windowingStrategy.getWindowFn().getClass()).isEqualTo(GlobalWindows.class);
  assertThat(lettersCollection.getName()).isEqualTo("A-B-C-D letters/Read(CreateSource).out");
}


@Test
public void should_create_pcollection_list() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection list");
  PCollection<String> letters1 = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));
  PCollection<String> letters2 = pipeline.apply(Create.of(Arrays.asList("d", "e", "f")));
  PCollection<String> letters3 = pipeline.apply(Create.of(Arrays.asList("g", "h", "i")));


  PCollectionList<String> allLetters = PCollectionList.of(letters1).and(letters2).and(letters3);
  List<PCollection<String>> lettersCollections = allLetters.getAll();

  PAssert.that(lettersCollections.get(0)).containsInAnyOrder("a", "b", "c");
  PAssert.that(lettersCollections.get(1)).containsInAnyOrder("d", "e", "f");
  PAssert.that(lettersCollections.get(2)).containsInAnyOrder("g", "h", "i");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_create_pcollection_tuple() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection tuple");
  PCollection<String> letters = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));
  PCollection<Integer> numbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
  PCollection<Boolean> flags = pipeline.apply(Create.of(Arrays.asList(true, false, true)));
  TupleTag<String> lettersTag = new TupleTag<>();
  TupleTag<Integer> numbersTag = new TupleTag<>();
  TupleTag<Boolean> flagsTag = new TupleTag<>();

  PCollectionTuple mixedData = PCollectionTuple.of(lettersTag, letters).and(numbersTag, numbers).and(flagsTag, flags);
  Map<TupleTag<?>, PCollection<?>> tupleData = mixedData.getAll();

  PAssert.that((PCollection<String>)tupleData.get(lettersTag)).containsInAnyOrder("a", "b", "c");
  PAssert.that((PCollection<Integer>)tupleData.get(numbersTag)).containsInAnyOrder(1, 2, 3);
  PAssert.that((PCollection<Boolean>)tupleData.get(flagsTag)).containsInAnyOrder(true, false, true);
  pipeline.run().waitUntilFinish();
}

This post introduces the first element of Apache Beam framework - data representation called PCollection. The first section made an insight on the most important points of this object - immutability, distributed character and windowing. The second part shown these points in the code.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

๐Ÿ‘‰ I'm interested in improving my data engineering skillset


If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

๐Ÿ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!