PCollection - data representation in Apache Beam

One of the problems with data processing frameworks released in the past few years was the use of different abstractions for batch and streaming tasks. Apache Beam is an exception of this rule because it proposes a uniform data representation called PCollection.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

In the first section we'll see the theoretical points about PCollection. Only the second one will show how to work (create, manipulate) on Beam's data abstraction in 2 conditions: batch and streaming.

PCollection explained

PCollection<T> is the data abstraction used in Apache Beam. Its type corresponds to the type of the values stored inside. The whole class can be described in following points:

Above points are sufficient for now. Further posts will focus on other important points as side inputs or pipeline execution.

PCollection examples

Some of the points listed above can be proven through the learning tests:

private static final String FILE_1 = "/tmp/beam/1";
private static final String FILE_2 = "/tmp/beam/2";

@BeforeClass
public static void writeFiles() throws IOException {
  FileUtils.writeStringToFile(new File(FILE_1), "1\n2\n3\n4", "UTF-8");
  FileUtils.writeStringToFile(new File(FILE_2), "5\n6\n7\n8", "UTF-8");
}

@AfterClass
public static void deleteFiles() {
  FileUtils.deleteQuietly(new File(FILE_1));
  FileUtils.deleteQuietly(new File(FILE_2));
}

@Test
public void should_construct_pcollection_from_memory_objects() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from memory");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_construct_pcollection_without_applying_transformation_on_it() {
  Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from file");

  TextIO.Read reader = TextIO.read().from("/tmp/beam/*");
  PCollection<String> readNumbers = pipeline.apply(reader);

  PAssert.that(readNumbers).containsInAnyOrder("1", "2", "3", "4", "5", "6", "7", "8");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_not_modify_input_pcollection_after_applying_the_transformation() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Transforming a PCollection");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a"));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  PAssert.that(aLetters).containsInAnyOrder("a");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_use_one_pcollection_as_input_for_different_transformations() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("Using one PCollection as input for different transformations");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a"));
  PCollection<String> notALetter = lettersCollection.apply(Filter.greaterThan("a"));

  PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d");
  PAssert.that(aLetters).containsInAnyOrder("a");
  PAssert.that(notALetter).containsInAnyOrder("b", "c", "d");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_create_explicitly_timestamped_batch_pcollection_with_custom_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Creating timestamped collection");
  PCollection<Integer> timestampedNumbers = pipeline.apply(Create.timestamped(
    TimestampedValue.of(1, new Instant(1)),
    TimestampedValue.of(2, new Instant(2)),
    TimestampedValue.of(3, new Instant(3)),
    TimestampedValue.of(4, new Instant(4))
  ));

  PCollection<String> mappedResult = timestampedNumbers.apply(
    Window.into(FixedWindows.of(new Duration(1)))
  ).apply(ParDo.of(new DoFn<Integer, String>() {
    @ProcessElement
    public void processElement(ProcessContext processingContext, BoundedWindow window) {
      processingContext.output(processingContext.element() + "(" + processingContext.timestamp() + ")" +
        " window="+window
      );
    }
  }));

  PAssert.that(mappedResult).containsInAnyOrder(
    "1(1970-01-01T00:00:00.001Z) window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.002Z)",
    "2(1970-01-01T00:00:00.002Z) window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.003Z)",
    "3(1970-01-01T00:00:00.003Z) window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.004Z)",
    "4(1970-01-01T00:00:00.004Z) window=[1970-01-01T00:00:00.004Z..1970-01-01T00:00:00.005Z)");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_get_pcollection_coder() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection coder");

  PCollection<String> lettersCollection = pipeline.apply(Create.of(letters));
  pipeline.run().waitUntilFinish();

  Coder<String> lettersCoder = lettersCollection.getCoder();
  assertThat(lettersCoder.getClass()).isEqualTo(StringUtf8Coder.class);
}

@Test
public void should_get_pcollection_metadata() {
  List<String> letters = Arrays.asList("a", "b", "c", "d");
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection metadata");

  PCollection<String> lettersCollection = pipeline.apply("A-B-C-D letters", Create.of(letters));
  pipeline.run().waitUntilFinish();

  assertThat(lettersCollection.isBounded()).isEqualTo(PCollection.IsBounded.BOUNDED);
  WindowingStrategy<?, ?> windowingStrategy = lettersCollection.getWindowingStrategy();
  assertThat(windowingStrategy.getWindowFn().getClass()).isEqualTo(GlobalWindows.class);
  assertThat(lettersCollection.getName()).isEqualTo("A-B-C-D letters/Read(CreateSource).out");
}


@Test
public void should_create_pcollection_list() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection list");
  PCollection<String> letters1 = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));
  PCollection<String> letters2 = pipeline.apply(Create.of(Arrays.asList("d", "e", "f")));
  PCollection<String> letters3 = pipeline.apply(Create.of(Arrays.asList("g", "h", "i")));


  PCollectionList<String> allLetters = PCollectionList.of(letters1).and(letters2).and(letters3);
  List<PCollection<String>> lettersCollections = allLetters.getAll();

  PAssert.that(lettersCollections.get(0)).containsInAnyOrder("a", "b", "c");
  PAssert.that(lettersCollections.get(1)).containsInAnyOrder("d", "e", "f");
  PAssert.that(lettersCollections.get(2)).containsInAnyOrder("g", "h", "i");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_create_pcollection_tuple() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection tuple");
  PCollection<String> letters = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));
  PCollection<Integer> numbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
  PCollection<Boolean> flags = pipeline.apply(Create.of(Arrays.asList(true, false, true)));
  TupleTag<String> lettersTag = new TupleTag<>();
  TupleTag<Integer> numbersTag = new TupleTag<>();
  TupleTag<Boolean> flagsTag = new TupleTag<>();

  PCollectionTuple mixedData = PCollectionTuple.of(lettersTag, letters).and(numbersTag, numbers).and(flagsTag, flags);
  Map<TupleTag<?>, PCollection<?>> tupleData = mixedData.getAll();

  PAssert.that((PCollection<String>)tupleData.get(lettersTag)).containsInAnyOrder("a", "b", "c");
  PAssert.that((PCollection<Integer>)tupleData.get(numbersTag)).containsInAnyOrder(1, 2, 3);
  PAssert.that((PCollection<Boolean>)tupleData.get(flagsTag)).containsInAnyOrder(true, false, true);
  pipeline.run().waitUntilFinish();
}

This post introduces the first element of Apache Beam framework - data representation called PCollection. The first section made an insight on the most important points of this object - immutability, distributed character and windowing. The second part shown these points in the code.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©