Data partitioning in Apache Beam

on waitingforcode.com

Data partitioning in Apache Beam

The power of Big Data processing platforms resides mainly in the ability to parallelize processing on different nodes. Each framework has its own unit of parallelism. In Spark it's called partition. Apache Beam calls it bundle.

After some first posts about data representation and data manipulation, it's a good moment to discover how Apache Beam handles parallel data processing. The first part explains the concept of bundles. The next 2 parts focus on internal details. The first of them defines data partitioning in file-based sources. The latter one does the same but for the streaming source. Apache Kafka was taken as an example. The last part shows, through ParDo transformation, what happens when a bundle is processed.

Bundle definition

As already told, the ability to divide the data and process it in bundles on different workers helps to work on large datasets in a reasonable time. In Apache Beam this unit of parallelization is called bundle and its partitioning it's not always the same for all data sources. Very often we can met the sentences talking about arbitrary bundles processing in Apache Beam.

Each bundle represents a subset of input data. A bundle is processed by one or more workers (if speculative execution). If one of bundle's elements makes the processing fail, the whole bundle and not only the failing element is retried on different worker.

The bundle size varies and it depends on the targeted parallelism. The size is expressed in bytes per bundle unit and its computation is defined for bounded sources in org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.InputProvider#getInitialInputs(AppliedPTransform, PTransform<PBegin, PCollection<T>>> transform, int targetParallelism) method.

Data partitioning - files

The text file is used here as an example for bounded source reading. Two reading patterns exist for this source. The first is the reading of a single file. In the second we can specify a directory containing files to read. For both options the file(s) can be splitted into different sub-sources. To make simple, let's assume that we have a file of 100 lines. Now, depending on preferred bundle size, the whole file will be divided in different arbitrary ranges (sub-sources). The arbitrary nature comes from the fact that the bundles are not guaranteed to be the same at every read. For instance, the first execution can create the bundles of [0-50], [51], [52-100] while the next one can construct different ones (e.g. [0-40], [41-51], [52-62], [62-100]).

Technically the class involved in partitioning is the implementation of org.apache.beam.sdk.io.OffsetBasedSource<T>. As its name indicates, it represents a data source that can be read with the use of offsets. A text file is a great example of offset-based source represented by FileBasedSource implementations: TextSource, AvroSource, or CompressedSource. Our case concerns TextSource and only it is presented below.

The sub-sources are created in org.apache.beam.sdk.io.FileBasedSource#split(long desiredBundleSizeBytes, PipelineOptions options) method. By analyzing it we can find what were explained above - the verification on source's splittability. This property is defined in Metadata implementation. For our case the implementation is LocalFileSystem and it's marked as always spittable (= read-seek efficient in Beam's nomenclature).

The bundles are created in split method. This method receives the information about offsets to read in the file. According to these values and the property called desiredBundleSizeOffsetUnits, it can create 1 or more bundles for given range.

The bundles are represented as the implementations of CommittedBundle interface. For the text files the used implementation is AutoValue_ImmutableListBundleFactory_CommittedImmutableListBundle.

Data partitioning - Kafka

For some cases Apache Beam, exactly as Apache Spark, uses data source's characteristics to resolve the partitioning. It's the case of Apache Kafka. This stream processing platform stores the messages in topics composed of partitions that are the parallelization unit.

As for the text files, the partitioning in Kafka is done through org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaSource#split(int desiredNumSplits, PipelineOptions options) method. The partition assignment is deterministic. First the pairs (topic, partition) are sorted. Later they're assigned to the workers in round-robin manner:

Collections.sort(partitions, new Comparator<TopicPartition>() {
  @Override
  public int compare(TopicPartition tp1, TopicPartition tp2) {
    return ComparisonChain
      .start()
      .compare(tp1.topic(), tp2.topic())
      .compare(tp1.partition(), tp2.partition())
      .result();
  }
});

checkArgument(desiredNumSplits > 0);
checkState(partitions.size() > 0,
  "Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

for (int i = 0; i < numSplits; i++) {
  assignments.add(new ArrayList<TopicPartition>());
}
for (int i = 0; i < partitions.size(); i++) {
  assignments.get(i % numSplits).add(partitions.get(i));
}

After assignment, each worker consumes records for associated (topic, partition) pair through the instance of org.apache.beam.sdk.io.kafka.KafkaIO.UnboundedKafkaReader being the implementation of org.apache.beam.sdk.io.UnboundedSource.UnboundedReader abstract class. Inside we can see 2 methods responsible for records reading: start() and advance(). The first one is used when the reader is initialized and needs to move to the reading position. The second one is called in order to get next available record in the stream source.

Bundle example

The following tests use text files to illustrate bundles creation:

private static final String BASE_DIR = "/tmp/beam/bundles/";
private static final String FILE_1 = BASE_DIR+"1";
private static final String FILE_2 = BASE_DIR+"2";

@BeforeClass
public static void writeFiles() throws IOException {
  String linesFile1 = IntStream.rangeClosed(1, 200).boxed().map(number -> number.toString())
    .collect(Collectors.joining("\n"));
  FileUtils.writeStringToFile(new File(FILE_1), linesFile1, "UTF-8");
  String linesFile2 = IntStream.rangeClosed(201, 400).boxed().map(number -> number.toString())
    .collect(Collectors.joining("\n"));
  FileUtils.writeStringToFile(new File(FILE_2), linesFile2, "UTF-8");
}

@AfterClass
public static void deleteFiles() {
  FileUtils.deleteQuietly(new File(FILE_1));
  FileUtils.deleteQuietly(new File(FILE_2));
}

@Test
public void should_show_how_bundle_is_divided_through_pardo() {
  Pipeline pipeline = BeamFunctions.createPipeline("Illustrating bundle division for 1 file");
  PCollection<String> file1Content = pipeline.apply(TextIO.read().from(FILE_1));

  file1Content.apply(ParDo.of(new BundlesDetector(Containers.TEST_1_FILE)));

  pipeline.run().waitUntilFinish();
  Map<String, List<String>> bundlesData = Containers.TEST_1_FILE.getBundlesData();
  assertThat(bundlesData.size()).isGreaterThan(1);
  Set<String> dataInBundles = new HashSet<>();
  bundlesData.forEach((bundleKey, data) -> dataInBundles.addAll(data));
  assertThat(dataInBundles).hasSize(200);
  IntStream.rangeClosed(1, 200).boxed().map(number -> number.toString())
    .forEach(letter -> assertThat(dataInBundles).contains(letter));
  System.out.println("========== Debug 1 file =========");
  bundlesData.forEach((bundleKey, data) -> printDebugMessage(bundleKey, data));
}

@Test
public void should_define_more_bundles_with_greater_level_of_parallelism() {
  Pipeline pipelineParallelism2 = BeamFunctions.createPipeline("Illustrating bundle division for 1 file for " +
    "parallelism of 2", 2);
  Pipeline pipelineParallelism20 = BeamFunctions.createPipeline("Illustrating bundle division for 1 file for " + "parallelism of 20", 20);
  PCollection<String> file1ContentPar2 = pipelineParallelism2.apply(TextIO.read().from(FILE_1));
  PCollection<String> file1ContentPar20 = pipelineParallelism20.apply(TextIO.read().from(FILE_1));

  file1ContentPar2.apply(ParDo.of(new BundlesDetector(Containers.TEST_1_FILE_PAR_2)));
  file1ContentPar20.apply(ParDo.of(new BundlesDetector(Containers.TEST_1_FILE_PAR_20)));
  pipelineParallelism2.run().waitUntilFinish();
  pipelineParallelism20.run().waitUntilFinish();

  Map<String, List<String>> bundlesDataPar2 = Containers.TEST_1_FILE_PAR_2.getBundlesData();
  Set<Integer> itemsInBundlesPar2 = new HashSet<>();
  bundlesDataPar2.values().forEach(bundleItems -> itemsInBundlesPar2.add(bundleItems.size()));
  // There are much more numbers but they're not always the same. The numbers below appeared in every
  // tested division
  // As you can see, the bundle's division is much less even as in the case of parallelism of 20
  assertThat(itemsInBundlesPar2).contains(1, 2, 4, 5, 7, 10, 34, 42);
  System.out.println("========== Debug 1 file - par 2 =========");
  bundlesDataPar2.forEach((bundleKey, data) -> printDebugMessage(bundleKey, data));

  Map<String, List<String>> bundlesDataPar20 = Containers.TEST_1_FILE_PAR_20.getBundlesData();
  Set<Integer> itemsInBundlesPar20 = new HashSet<>();
  bundlesDataPar20.values().forEach(bundleItems -> {
      itemsInBundlesPar20.add(bundleItems.size());
  });
  assertThat(itemsInBundlesPar20).contains(1, 3, 8, 9, 10, 11, 12, 15);
  System.out.println("========== Debug 1 file - par 20 =========");
  bundlesDataPar20.forEach((bundleKey, data) -> printDebugMessage(bundleKey, data));
}

@Test
public void should_process_data_in_bundles_even_for_2_read_files() {
  Pipeline pipeline = BeamFunctions.createPipeline("Illustrating bundle division for 2 files");
  PCollection<String> file1Content = pipeline.apply(TextIO.read().from(BASE_DIR+"/*"));

  file1Content.apply(ParDo.of(new BundlesDetector(Containers.TEST_2_FILES)));

  pipeline.run().waitUntilFinish();
  Map<String, List<String>> bundlesData = Containers.TEST_2_FILES.getBundlesData();
  assertThat(bundlesData.size()).isGreaterThan(1);
  Set<String> dataInBundles = new HashSet<>();
  bundlesData.forEach((bundleKey, data) -> dataInBundles.addAll(data));
  assertThat(dataInBundles).hasSize(400);
  IntStream.rangeClosed(1, 400).boxed().map(number -> number.toString())
    .forEach(letter -> assertThat(dataInBundles).contains(letter));
  System.out.println("========== Debug 2 files =========");
  bundlesData.forEach((bundleKey, data) -> printDebugMessage(bundleKey, data));
}

private static void printDebugMessage(String bundleKey, Collection<String> bundleData) {
  System.out.println("["+bundleKey+"]");
  System.out.println("=> "+bundleData);
}


class BundlesDetector extends DoFn<String, String> {

  private String bundleName;

  private Containers container;

  public BundlesDetector(Containers container) {
    this.container = container;
  }

  @DoFn.StartBundle
  public void initializeBundle() {
    this.bundleName = UUID.randomUUID().toString();
  }

  @DoFn.ProcessElement
  public void processLine(ProcessContext processContext) {
    container.addItem(bundleName, processContext.element());
  }
}

enum Containers {

  TEST_1_FILE, TEST_1_FILE_PAR_2, TEST_1_FILE_PAR_20, TEST_2_FILES;

  private Map<String, List<String>> bundlesData = new ConcurrentHashMap<>();

  public void addItem(String bundle, String item) {
    List<String> accumulatedItems = bundlesData.getOrDefault(bundle, new ArrayList<>());
    accumulatedItems.add(item);
    bundlesData.put(bundle, accumulatedItems);
  }

  public Map<String, List<String>> getBundlesData() {
    return bundlesData;
  }

}

The unit of parallelization in Apache Beam is called bundle. It represents a subset of main dataset divided on available workers. The bundles are also the main unit of failure recovery, i.e. if an error occurs on only 1 element of a bundle, the whole bundle is retried and its already processed elements are reprocessed. The second part shown how Beam splits text files into bundles. The partitioning process is conditioned by the source's ability to execute seeks during the reads. The third part explained the partitioning in unbounded source - Apache Kafka. Unsurprisingly, Beam uses the characteristics of this source and assigns to each worker (topic, partition) pairs. Finally, the last part exploited @DoFn's annotations to show how the bundles are created for the case of text files.