Stateful transformations in Spark Streaming

Versions: Spark 2.0.0

Spark Streaming is able to handle state-based operations, ie. operations containing a state susceptible to be modified in subsequent batches of data.

This post is dedicated to state operations implemented in Spark Streaming. The first part describes a general idea of stateful updates. After it, the second part talks about objects related to state-based transformations in Spark Streaming API. The last part shows some test cases using state-based transformations for the situation of user visit on a website.

Introduction to state-based transformations

Stateful operations can be very useful to track some changes, for example: user visits. To active them streaming context must have defined checkpoint directory. Tracking state of data requires to store a lot of metadata over time. Thanks to periodic checkpointing this metadata overhead is reduced. As you could learn in the article about checkpoints and Write Ahead Logs in Spark, checkpointed RDD is a flatten version of RDD without parent dependencies. However, the requirement of checkpoint activation may influent processing time because of saving data to HDFS or other reliable storage.

State transformations is applied on every new batch and only on pair DStreams. In the context of state-based transformations, values of these pairs are considered as states. If we take the example described in the previous paragraph with user visits, we could consider that the key of stored structure is session id and value (state) the list of visited pages.

Now every time when given key is met in state update function, we can mutate its state. If given key is missing we can suppose that the state is lost (for example: user closed his navigation session) and do not keep his state anymore.

StateDStream

Under-the-hood state-based transformations are represented by StateDStream. It overrides 2 behaviors of DStream: checkpoint and cache strategy. As already told, stateful transformations need checkpointing enabled. StateDStream changes field telling if given DStream must be checkpointed from false to true. Furthermore, it forces data persistance. Both changes are visible at the beginning of this class:

super.persist(StorageLevel.MEMORY_ONLY_SER)
override val mustCheckpoint = true

The function responsible for creating StateDStream is updateStateByKey(Function2). As the single argument it takes 2-parameter function used to update the state. The signature of this function defines 2 other arguments: a list of values and an optional object. The first parameter represents new values accumulated for the last time. The second parameter represents the state from the previous computation.

State-based operations examples

Below you can find 2 tests showing a sample use of stateful transformations. The analyzed case is user visit on a website. We suppose that our system accumulate all visited pages and tries simply to keep their trace:

@Test
public void should_fail_on_applying_update_by_state_without_checkpoint_activated() throws IOException, InterruptedException {
  // Checkpoint directory is mandatory for running *ByState() transformations.
  // Checkpoint reduces amount of metadata stored during subsequent updates (= it doesn't
  // keep traces on dependent parents).
  triggerDataCreation(0, 2, HOME_PAGE, TEAM_PAGE);

  JavaPairDStream<Integer, Visitor> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(visitor -> new Tuple2<>(visitor.id, visitor)));
  JavaPairDStream<Integer, List<String>> pagesVisitedByUserPairDStream =
    pairEntriesDStream.updateStateByKey(new AllVisitedPagesGenerator());
  pagesVisitedByUserPairDStream.foreachRDD(rdd -> System.out.println("Data: " + rdd.collect()));

  try {
    streamingContext.start();
    fail("Contenxt shouldn't be started without checkpoint directory");
  } catch (IllegalArgumentException iae) {
    assertThat(iae.getMessage()).contains("requirement failed: The checkpoint directory has not been set");
    iae.printStackTrace();
  }
}

@Test
public void should_apply_new_state_on_each_batch() throws IOException, InterruptedException {
  streamingContext.checkpoint(CHECKPOINT_DIRECTORY);
  // Imagine 2 users - each of them visits 3 pages. The first user wants to contact the company
  // and the second user wants to work for it.
  triggerDataCreation(0, 2, HOME_PAGE, TEAM_PAGE);
  triggerDataCreation(0, 2, ABOUT_US_PAGE, JOB_OFFERS_PAGE);
  triggerDataCreation(0, 2, CONTACT_PAGE, JOIN_US_PAGE);

  JavaPairDStream<Integer, Visitor> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(visitor -> new Tuple2<>(visitor.id, visitor)));

  // state = value
  JavaPairDStream<Integer, List<String>> pagesVisitedByUserPairDStream =
    pairEntriesDStream.updateStateByKey(new AllVisitedPagesGenerator());

  Map<Integer, Set<List<String>>> receivedData = new HashMap<>();
  receivedData.put(0, new HashSet<>());
  receivedData.put(1, new HashSet<>());
  pagesVisitedByUserPairDStream.foreachRDD(rdd -> {
    rdd.collectAsMap().forEach((visitorId, visitedPages) -> {
      receivedData.get(visitorId).add(visitedPages);
    });
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(3_000L);

  // User#1: home.html -> about_us.html -> contact.html
  assertThat(receivedData.get(0)).hasSize(3);
  assertThat(receivedData.get(0)).containsOnly(Arrays.asList(HOME_PAGE), Arrays.asList(HOME_PAGE, ABOUT_US_PAGE),
    Arrays.asList(HOME_PAGE, ABOUT_US_PAGE, CONTACT_PAGE));
  // User#2: team.html > job_offers.html -> join_us.html
  assertThat(receivedData.get(1)).hasSize(3);
  assertThat(receivedData.get(1)).containsOnly(Arrays.asList(TEAM_PAGE), Arrays.asList(TEAM_PAGE, JOB_OFFERS_PAGE),
    Arrays.asList(TEAM_PAGE, JOB_OFFERS_PAGE, JOIN_US_PAGE));
}

@Test
public void should_eliminate_item_from_rdd_because_of_empty_next_state() throws InterruptedException, IOException {
  streamingContext.checkpoint(CHECKPOINT_DIRECTORY);
  triggerDataCreation(0, 2, HOME_PAGE, TEAM_PAGE);
  triggerDataCreation(0, 2, ABOUT_US_PAGE, LAST_PAGE);
  // User#2 closed his browser and reopened it to start new navigation
  // while User#1 still continues his browsing
  triggerDataCreation(0, 2, CONTACT_PAGE, HOME_PAGE);

  JavaPairDStream<Integer, Visitor> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(visitor -> new Tuple2<>(visitor.id, visitor)));

  // state = value
  JavaPairDStream<Integer, List<String>> pagesVisitedByUserPairDStream =
    pairEntriesDStream.updateStateByKey(new AllVisitedPagesGenerator());

  Map<Integer, Set<List<String>>> receivedData = new HashMap<>();
  receivedData.put(0, new HashSet<>());
  receivedData.put(1, new HashSet<>());
  List<Integer> partitions = new ArrayList<>();
  pagesVisitedByUserPairDStream.foreachRDD(rdd -> {
    partitions.add(rdd.getNumPartitions());
    rdd.collectAsMap().forEach((visitorId, visitedPages) -> receivedData.get(visitorId).add(visitedPages));
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(3_000L);

  // User#1: home.html -> about_us.html -> contact.html
  assertThat(receivedData.get(0)).hasSize(3);
  assertThat(receivedData.get(0)).containsOnly(Arrays.asList(HOME_PAGE), Arrays.asList(HOME_PAGE, ABOUT_US_PAGE),
    Arrays.asList(HOME_PAGE, ABOUT_US_PAGE, CONTACT_PAGE));
  // User#2: team.html > job_offers.html -> join_us.html
  assertThat(receivedData.get(1)).hasSize(2);
  assertThat(receivedData.get(1)).containsOnly(Arrays.asList(TEAM_PAGE), Arrays.asList(HOME_PAGE));
  // Check the number of partitions too
  assertThat(partitions).containsOnly(4);
}

@Test
public void should_update_state_and_change_the_number_of_partitions() throws InterruptedException, IOException {
  streamingContext.checkpoint("/home/konieczny/tmp/spark/checkpoints_streaming");
  triggerDataCreation(0, 2, HOME_PAGE, TEAM_PAGE);
  triggerDataCreation(0, 2, ABOUT_US_PAGE, JOB_OFFERS_PAGE);

  JavaPairDStream<Integer, Visitor> pairEntriesDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME)
    .transformToPair(rdd -> rdd.mapToPair(visitor -> new Tuple2<>(visitor.id, visitor)));

  int partitionSize = 2;
  JavaPairDStream<Integer, List<String>> pagesVisitedByUserPairDStream =
    pairEntriesDStream.updateStateByKey(new AllVisitedPagesGenerator(), partitionSize);

  List<Integer> partitions = new ArrayList<>();
  pagesVisitedByUserPairDStream.foreachRDD(rdd -> partitions.add(rdd.getNumPartitions()));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(3_000L);

  assertThat(partitions).containsOnly(partitionSize);
}

private void triggerDataCreation(int start, int maxRDDs, String...pages) throws IOException {
  for (int i = start; i < maxRDDs; i++) {
    JavaRDD<Visitor> newRDD1 = batchContext.parallelize(Arrays.asList(new Visitor(i, pages[i])));
    testQueue.add(newRDD1);
  }
}

private static class Visitor implements Serializable {
  private int id;
  private String page;

  public Visitor(int id, String page) {
    this.id = id;
    this.page = page;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this).add("id", id)
      .add("page", page).toString();
  }
}

private static class AllVisitedPagesGenerator implements Serializable, Function2<List<Visitor>, Optional<List<String>>, Optional<List<String>>> {

  @Override
  public Optional<List<String>> call(List<Visitor> values, Optional<List<String>> previousState) throws Exception {
    // values = visitors accumulated during batch duration;
    // In our case it means new visited page (because of one-at-time flag
    // set to true on queue DStream)
    List<String> visitedPages = values.stream().map(visitor -> visitor.page).collect(Collectors.toList());
    if (visitedPages.contains(LAST_PAGE)) {
      // Here we consider that user ended his browsing.
      // It's represented by LAST_PAGE's visit.
      // We don't want to include this page in updated state
      // so we return empty(). By doing that, user's browsing stops
      // here. If he's seen new time, it will be considered as a new
      // browsing.
      return Optional.empty();
    }
    // previousState can be empty when the key is updated for the first time
    List<String> alreadyVisitedPages = previousState.orElse(new ArrayList<>());
    List<String> allPages = Lists.newArrayList(alreadyVisitedPages);
    allPages.addAll(visitedPages);
    return Optional.of(allPages);
  }
}

This post presents stateful transformation in Spark Streaming called updateStateByKey. In the first part we could see a global image of this kind of transformation. The second part presented DStream implementation responsible for state-based operations. The last part presented simple use case of stateful transformations through simulated user navigation.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!