Shared variables in Spark

on waitingforcode.com

Shared variables in Spark

Spark has an interesting concept of shared variables among all distributed computations. This special kind of objects is called broadcast variables. But it's not the single possibility to share objects in Spark. The second one are accumulators.

After describing a little how does distributed computing work, we can stop a little and focus more on specific components which are shared objects. The first part of this post describes broadcast variables. The second talks about accumulators. The last part shows how the use of these 2 methods through learning tests.

Broadcast variables in Spark

First of all, each node executing a task of given stage works on its own copy of objects. So if we have a RDD we want to filter on 3 executors, each of these executors will receive a copy of RDD's part to compute. In consequence, if one executor changes some properties of manipulated objects, these modifications won't be propagated back to the driver program, and thus, to other executors.

One of solutions to allow that are broadcast variables. As we can guess, there is a single instance of it for shared between all computations. However, how do executors can access it ? First of all, broadcast variables are supposed to be read-only. Before launching the computation, Spark sends it to each node concerned by related task. After that each node caches it locally in serialized form. Now before executing each of its planned tasks, instead of getting values from the driver, it retrieves them locally from the cache. So broadcasting doesn't really mean that given object is not transmitted across the network at all. But with the difference to "normal" variables, broadcast ones are always read-only and thanks to that, they can be sent only once.

Broadcast variables should then be used when different tasks operate on the same data. They're well suited to handle quite big amount of data, as for example database chunk of several MB. Thus, it's also important to use an efficient format of serialization. Since the amount of transferred data can be big, a performant serialization format will help to improve the performances.

Accumulators in Spark

The second method of sharing objects in Spark consists on the use of accumulators. As its name indicates, their main role consists on accumulating values. As an purely theoretical example we could take the case of timer. Let's imagine that in our processing we need to get some information from external web service. We suppose that it's a potential source of bottleneck, so we want to measure the connection time. One of solutions could be the use of accumulator getting the each connection time in some object. The specific use of that is defined in the next part.

Other use case of accumulators are counters. In this case accumulator could act as an shared value, incremented with the number of objects treated in each task. This example is also implemented in the next part.

Unlike broadcast variables, accumulators are writable. However, written values can be only read in driver program. It's why accumulators work pretty well as data aggregators. Without accumulators, values aggregation wouldn't be possible because each executor operates on its own copy of object. Even if that object is a map containing shared key, values put to it won't be different for each executor.

Spark 2.0 introduces new accumulator API under AccumulatorV2. The previous one is still supported but marked as deprecated.

Broadcast variables and accumulators example

Below you can find some examples illustrating broadcast variables and accumulators use:

private static final List<Integer> TEST_VALUES = 
  IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());

@Test
public void should_correctly_use_broadcast_variable() {
  CollectionAccumulator<TestClass> broadcastAccumulator =
      CONTEXT.sc().collectionAccumulator("broadcast accumulator");
  Broadcast<TestClass> largeNumber = CONTEXT.broadcast(new TestClass());
  CONTEXT.parallelize(TEST_VALUES)
    .map(number -> {
      broadcastAccumulator.add(largeNumber.value());
      return number;
    })
    .collect();


  List<TestClass> operatedClasses = broadcastAccumulator.value();
  for (int i = 1; i < operatedClasses.size(); i++) {
    assertThat(operatedClasses.get(i-1)).isSameAs(operatedClasses.get(i));
  }
}


@Test
public void should_use_accumulator_to_count_filtered_objects() {
  LongAccumulator longAccumulator = CONTEXT.sc().longAccumulator("Filtered items counter");
  CONTEXT.parallelize(TEST_VALUES)
    .filter(number -> {
      longAccumulator.add(1L);
      return number < 50;
    }).collect();

  assertThat(longAccumulator.value()).isEqualTo(100);
}

@Test
public void should_use_accumulator_with_custom_type() {
  List<String> urls = Arrays.asList("URL#1", "URL#2", "URL#3", "URL#4", "URL#5", "URL#6");
  CollectionAccumulator<MetricBean> metricsAccumulator = 
    CONTEXT.sc().collectionAccumulator("metrics accumulator");

  CONTEXT.parallelize(urls).map(
    url -> {
      Random randomizer = new Random();
      randomizer.nextLong();
      metricsAccumulator.add(new MetricBean(url, randomizer.nextLong()));
      return url+ " was treated";
    }
  ).collect();

  List<MetricBean> metrics = metricsAccumulator.value();
  assertThat(metrics).hasSize(6);
  assertThat(metrics).extracting("url").containsAll(urls);
  metrics.forEach(metricBean -> assertThat(metricBean.connectionTime).isNotNull());
}

@Test
public void should_fail_on_aggregating_values_without_accumulator() {
  // Executors work on local copy of the object and
  // the changes aren't propagated back to the driver
  Map<String, Integer> stats = new ConcurrentHashMap<>();
  stats.put("BIG", 0);
  stats.put("NOT_BIG", 0);

  JavaRDD<Integer> bigNumbersRDD = CONTEXT.parallelize(TEST_VALUES)
    .filter(value -> {
      boolean isBigNumber = value > 9;
      String key = "NOT_BIG";
      if (isBigNumber) {
          key = "BIG";
      }
      stats.replace(key, stats.get(key) + 1);
      return isBigNumber;
    });

  assertThat(bigNumbersRDD.collect()).hasSize(91);
  assertThat(stats).hasSize(2);
  assertThat(stats.get("BIG")).isEqualTo(0);
  assertThat(stats.get("NOT_BIG")).isEqualTo(0);
}

private static class MetricBean implements Serializable {
  private final String url;
  private final Long connectionTime;

  public MetricBean(String url, Long connectionTime) {
    this.url = url;
    this.connectionTime = connectionTime;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this).add("url", url)
      .add("connectionTime", connectionTime).toString();
  }
}

private static class TestClass implements Serializable {
    private UUID id = UUID.randomUUID();
}

Through this article we can discover 2 methods of objects sharing in Spark. The first one concerns read-only data which is copied before the first transformation on each executor node, cached there and used for further computations. Its name is broadcast variable. The second type of shared objects are the ones handled by accumulators. They allow driver program to accumulate expected type of objects from executors. They can be used, for example, as counters or metrics aggregators. The last part shows how both types can be used through some simple unit tests.

Share on: