https://github.com/bartosz25/beam-learning
One of the problems with data processing frameworks released in the past few years was the use of different abstractions for batch and streaming tasks. Apache Beam is an exception of this rule because it proposes a uniform data representation called PCollection.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
In the first section we'll see the theoretical points about PCollection. Only the second one will show how to work (create, manipulate) on Beam's data abstraction in 2 conditions: batch and streaming.
PCollection explained
PCollection<T> is the data abstraction used in Apache Beam. Its type corresponds to the type of the values stored inside. The whole class can be described in following points:
- immutable - the data stored in one PCollection can't be modified. If some transformation is applied on it, a new PCollection instance is created.
- distributed - the PCollection represents the data stored in distributed dataset, that is the parts of whole data resides on different processing nodes.
- boundary(less) - PCollection is a data abstraction shared by stream and batch processing. It means that its size can be unbounded or bounded.
- windowed - every PCollection, independently on boundaries (bounded/unbounded), has its own associated windows. In the case of batch data, the associated default window is called global window. Beside it, other window types exist and they'll be covered in one of next posts.
- universal - one PCollection can be an input for a lot of different transformations.
- driven by the pipeline - Beam's Pipeline is a DAG with all defined transformations. PCollection is the result of transformations added to the execution Pipeline.
- coder - PCollection is associated with an object called Coder, responsible for encoding and decoding PCollection values into byte stream.
- timestamped - all elements from PCollection have an associated timestamp representing reading time. It's used later to evaluate if the data arrives in late, especially in streaming processing.
- physical or volatile - PCollection can be built either on physical data (e.g. stored on disk) or on volatile data (e.g. stored on RAM memory). For the first case we consider that the PCollection is read while for the second that it's created.
- value or key-value - a PCollection can either represent values or key-value pairs. The second representation enables the transformations per key basis, e.g. grouping or counting.
- views - PCollection can be transformed to the structure called view. This structure itself can be latter injected as side input to the processing methods. The notion of side input will be explained more in details in further posts. Now we can simply consider it as an additional object used with input PCollection in the transformation (similarly to broadcast variables in Spark).
- collections - multiple PCollections can be mixed. For instance, they can compose a structure called PCollectionList. It can be used as the input or output for certain operations: partition or flattening.
The other collection-like structure is PCollectionTuple. It differs from PCollectionList because it can handle heterogeneous types, for instance numbers, strings or collections. It's useful in the case when a transformation takes or produces multiple PCollection of different types.
Above points are sufficient for now. Further posts will focus on other important points as side inputs or pipeline execution.
PCollection examples
Some of the points listed above can be proven through the learning tests:
private static final String FILE_1 = "/tmp/beam/1"; private static final String FILE_2 = "/tmp/beam/2"; @BeforeClass public static void writeFiles() throws IOException { FileUtils.writeStringToFile(new File(FILE_1), "1\n2\n3\n4", "UTF-8"); FileUtils.writeStringToFile(new File(FILE_2), "5\n6\n7\n8", "UTF-8"); } @AfterClass public static void deleteFiles() { FileUtils.deleteQuietly(new File(FILE_1)); FileUtils.deleteQuietly(new File(FILE_2)); } @Test public void should_construct_pcollection_from_memory_objects() { List<String> letters = Arrays.asList("a", "b", "c", "d"); Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from memory"); PCollection<String> lettersCollection = pipeline.apply(Create.of(letters)); PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d"); pipeline.run().waitUntilFinish(); } @Test public void should_construct_pcollection_without_applying_transformation_on_it() { Pipeline pipeline = BeamFunctions.createPipeline("Creating PCollection from file"); TextIO.Read reader = TextIO.read().from("/tmp/beam/*"); PCollection<String> readNumbers = pipeline.apply(reader); PAssert.that(readNumbers).containsInAnyOrder("1", "2", "3", "4", "5", "6", "7", "8"); pipeline.run().waitUntilFinish(); } @Test public void should_not_modify_input_pcollection_after_applying_the_transformation() { List<String> letters = Arrays.asList("a", "b", "c", "d"); Pipeline pipeline = BeamFunctions.createPipeline("Transforming a PCollection"); PCollection<String> lettersCollection = pipeline.apply(Create.of(letters)); PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a")); PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d"); PAssert.that(aLetters).containsInAnyOrder("a"); pipeline.run().waitUntilFinish(); } @Test public void should_use_one_pcollection_as_input_for_different_transformations() { List<String> letters = Arrays.asList("a", "b", "c", "d"); Pipeline pipeline = BeamFunctions.createPipeline("Using one PCollection as input for different transformations"); PCollection<String> lettersCollection = pipeline.apply(Create.of(letters)); PCollection<String> aLetters = lettersCollection.apply(Filter.equal("a")); PCollection<String> notALetter = lettersCollection.apply(Filter.greaterThan("a")); PAssert.that(lettersCollection).containsInAnyOrder("a", "b", "c", "d"); PAssert.that(aLetters).containsInAnyOrder("a"); PAssert.that(notALetter).containsInAnyOrder("b", "c", "d"); pipeline.run().waitUntilFinish(); } @Test public void should_create_explicitly_timestamped_batch_pcollection_with_custom_window() { Pipeline pipeline = BeamFunctions.createPipeline("Creating timestamped collection"); PCollection<Integer> timestampedNumbers = pipeline.apply(Create.timestamped( TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)), TimestampedValue.of(3, new Instant(3)), TimestampedValue.of(4, new Instant(4)) )); PCollection<String> mappedResult = timestampedNumbers.apply( Window.into(FixedWindows.of(new Duration(1))) ).apply(ParDo.of(new DoFn<Integer, String>() { @ProcessElement public void processElement(ProcessContext processingContext, BoundedWindow window) { processingContext.output(processingContext.element() + "(" + processingContext.timestamp() + ")" + " window="+window ); } })); PAssert.that(mappedResult).containsInAnyOrder( "1(1970-01-01T00:00:00.001Z) window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.002Z)", "2(1970-01-01T00:00:00.002Z) window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.003Z)", "3(1970-01-01T00:00:00.003Z) window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.004Z)", "4(1970-01-01T00:00:00.004Z) window=[1970-01-01T00:00:00.004Z..1970-01-01T00:00:00.005Z)"); pipeline.run().waitUntilFinish(); } @Test public void should_get_pcollection_coder() { List<String> letters = Arrays.asList("a", "b", "c", "d"); Pipeline pipeline = BeamFunctions.createPipeline("PCollection coder"); PCollection<String> lettersCollection = pipeline.apply(Create.of(letters)); pipeline.run().waitUntilFinish(); Coder<String> lettersCoder = lettersCollection.getCoder(); assertThat(lettersCoder.getClass()).isEqualTo(StringUtf8Coder.class); } @Test public void should_get_pcollection_metadata() { List<String> letters = Arrays.asList("a", "b", "c", "d"); Pipeline pipeline = BeamFunctions.createPipeline("PCollection metadata"); PCollection<String> lettersCollection = pipeline.apply("A-B-C-D letters", Create.of(letters)); pipeline.run().waitUntilFinish(); assertThat(lettersCollection.isBounded()).isEqualTo(PCollection.IsBounded.BOUNDED); WindowingStrategy<?, ?> windowingStrategy = lettersCollection.getWindowingStrategy(); assertThat(windowingStrategy.getWindowFn().getClass()).isEqualTo(GlobalWindows.class); assertThat(lettersCollection.getName()).isEqualTo("A-B-C-D letters/Read(CreateSource).out"); } @Test public void should_create_pcollection_list() { Pipeline pipeline = BeamFunctions.createPipeline("PCollection list"); PCollection<String> letters1 = pipeline.apply(Create.of(Arrays.asList("a", "b", "c"))); PCollection<String> letters2 = pipeline.apply(Create.of(Arrays.asList("d", "e", "f"))); PCollection<String> letters3 = pipeline.apply(Create.of(Arrays.asList("g", "h", "i"))); PCollectionList<String> allLetters = PCollectionList.of(letters1).and(letters2).and(letters3); List<PCollection<String>> lettersCollections = allLetters.getAll(); PAssert.that(lettersCollections.get(0)).containsInAnyOrder("a", "b", "c"); PAssert.that(lettersCollections.get(1)).containsInAnyOrder("d", "e", "f"); PAssert.that(lettersCollections.get(2)).containsInAnyOrder("g", "h", "i"); pipeline.run().waitUntilFinish(); } @Test public void should_create_pcollection_tuple() { Pipeline pipeline = BeamFunctions.createPipeline("PCollection tuple"); PCollection<String> letters = pipeline.apply(Create.of(Arrays.asList("a", "b", "c"))); PCollection<Integer> numbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3))); PCollection<Boolean> flags = pipeline.apply(Create.of(Arrays.asList(true, false, true))); TupleTag<String> lettersTag = new TupleTag<>(); TupleTag<Integer> numbersTag = new TupleTag<>(); TupleTag<Boolean> flagsTag = new TupleTag<>(); PCollectionTuple mixedData = PCollectionTuple.of(lettersTag, letters).and(numbersTag, numbers).and(flagsTag, flags); Map<TupleTag<?>, PCollection<?>> tupleData = mixedData.getAll(); PAssert.that((PCollection<String>)tupleData.get(lettersTag)).containsInAnyOrder("a", "b", "c"); PAssert.that((PCollection<Integer>)tupleData.get(numbersTag)).containsInAnyOrder(1, 2, 3); PAssert.that((PCollection<Boolean>)tupleData.get(flagsTag)).containsInAnyOrder(true, false, true); pipeline.run().waitUntilFinish(); }
This post introduces the first element of Apache Beam framework - data representation called PCollection. The first section made an insight on the most important points of this object - immutability, distributed character and windowing. The second part shown these points in the code.