In Spark, actions are the final results of operations on RDDs. Without them, transformations are meaningless and difficult to use by applications.

Similarly to the article about transformations in Spark, this post is divided in 2 parts. The first one lists some of available actions and shows their use cases through pseudo-code snippets. The second part shows how to use these actions through Java's API.

Spark actions explained

As already told, Spark actions are the finality of transformations made on RDD. They can as well make statistics about transformed data, as well as persist this data into file. So, action's output is sent either to driver program or to external file system (as HDFS).

As you remember, RDD as manipulated lazily and only actions make that defined transformations are really (physically) executed. Concretely, actions translate plan containing operations to made (DAG - Directed Acyclic Graph) to an execution plan. We'll discover DAG and execution flow more in detail in one of next posts. Now, we can simply see which actions are available in Spark:

Spark Java API actions

Below you can find some test cases illustrating actions use through Spark's Java API:

public void should_count_all_filtered_numbers() {
  List<Integer> numbers = Lists.newArrayList(1, 2, 3, 4, 5, 6, 6, 6);

  // count() doesn't distinct duplicated values
  long allDefinedNotes = CONTEXT.parallelize(numbers).count();


public void should_take_only_first_2_numbers() {
  List<Integer> numbers = Lists.newArrayList(1, 2, 3, 4, 5, 6);

  // count() doesn't distinct duplicated values
  List<Integer> top2Numbers = CONTEXT.parallelize(numbers).top(2);

  assertThat(top2Numbers).containsExactly(6, 5);

public void should_take_ordered_2_biggest_numbers() {
  List<Integer> numbers = Lists.newArrayList(10, 49, 1, 2, 30, 3, 4, 64, 5, 6);

  // count() doesn't distinct duplicated values
  List<Integer> top2Numbers = CONTEXT.parallelize(numbers)
    .takeOrdered(2, Comparator.<Integer>naturalOrder().reversed());

  assertThat(top2Numbers).containsExactly(64, 49);

public void should_multiply_each_entry_by_2_without_changing_object_out_of_scope() {
  List<Integer> numbers = Lists.newArrayList(3, 6, 12, 24, 48, 96);

  // count() doesn't distinct duplicated values
  List<Integer> multipliedBy2 = new ArrayList<>();
  JavaRDD<Integer> rddNumbers = CONTEXT.parallelize(numbers);
  rddNumbers.foreach(number -> {

  // Even if foreach is called locally, it won't work as forEach from Java 8.
  // In fact, Spark submits executable code as a serialized closure. All variables defined
  // and used inside these closures are the copies of real variables.
  // So in our case each executor is working on copy of List<Integer> multipliedBy2.
  // We can apperceive that in System's output print which, after the last iteration,
  // prints something like that:
  // <pre>
  // List<Integer> multipliedBy2 after adding: [6, 12, 24, 48, 96, 192]
  // </pre>
  // A solution for this error could be the use of Accumulators, as in the test called
  // should_correctly_apply_foreach_for_accumulator . It helps to collect manipulated data
  // across different executors.
  assertThat(rddNumbers.collect()).containsExactly(3, 6, 12, 24, 48, 96);

public void should_correctly_apply_foreach_for_accumulator() {
  Accumulator accumulator = new Accumulator("", new StringParam());
  List<String> numbersStr = Lists.newArrayList("3", "6", "12");
  CONTEXT.parallelize(numbersStr).foreach((String number) -> accumulator.add(number));


public void should_return_correct_count_of_distinct_values() {
  List<String> letters = Lists.newArrayList("A", "A", "A", "B", "B", "C");

  // Two methods exist to do that - either take(1) or first()
  Map<String, Long> counter = CONTEXT.parallelize(letters).countByValue();


public void should_correctly_count_by_key() {
  // tuple represents different animal owner; The key corresponds to the
  // kind of animal and the value to the number of animals in its own, for example:
  // "cat", 2 -> one owner has 2 cats
  // "dog", 7 -> one owner has 7 dogs
  List<Tuple2<String, Integer>> speciesCounterList = Lists.newArrayList(
    new Tuple2<>("cat", 2), new Tuple2<>("dog", 7), new Tuple2<>("cat", 4), new Tuple2<>("duck", 11)

  JavaPairRDD<String, Integer> speciesRDD = CONTEXT.parallelizePairs(speciesCounterList);

  // we want to know the number of animals owned by all owners
  // as you can see, we do not want to know the sum of all species
  Map<String, Object> ownersCounter = speciesRDD.countByKey();


public void should_collect_distinct_letters() {
  List<String> letters = Lists.newArrayList("A", "A", "A", "B", "B", "C");

  List<String> distinctLetters = CONTEXT.parallelize(letters)

  assertThat(distinctLetters).containsOnly("A", "B", "C");

public void should_take_the_first_defined_player() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team2", FR, 7, 10);
  Player player4 = new Player("Player4", "Team1", DE, 17, 10);
  Player player5 = new Player("Player5", "Team2", PL, 3, 1);
  List<Player> players = Lists.newArrayList(player1, player4,   player3, player2, player5);

  // Two methods exist to do that - either take(1) or first()
  List<Player> theFirstPlayerMethod1 = CONTEXT.parallelize(players).take(1);
  Player theFirstPlayerMethod2 = CONTEXT.parallelize(players).first();


public void should_return_3_the_best_scorers() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team2", FR, 7, 10);
  Player player4 = new Player("Player4", "Team1", DE, 17, 10);
  Player player5 = new Player("Player5", "Team2", PL, 3, 1);
  List<Player> players = Lists.newArrayList(player1, player4,   player3, player2, player5);

  // Exactly in the same fashion, with the use of min() method, we can get the worst scorer
  List<Player> theBestScorers = CONTEXT.parallelize(players)
          .top(3, new GoalsComparator());

  assertThat(theBestScorers).extracting("name").containsExactly("Player4", "Player1", "Player3");
  assertThat(theBestScorers).extracting("goals").containsExactly(17, 10, 7);

public void should_return_the_best_scorer() {
  Player player1 = new Player("Player1", "Team1", FR, 10, 0);
  Player player2 = new Player("Player2", "Team1", FR, 5, 3);
  Player player3 = new Player("Player3", "Team2", FR, 7, 10);
  Player player4 = new Player("Player4", "Team1", DE, 17, 10);
  Player player5 = new Player("Player5", "Team2", PL, 3, 1);
  List<Player> players = Lists.newArrayList(player1, player4,   player3, player2, player5);

  // Exactly in the same fashion, with the use of min() method, we can get the worst scorer
  Player theBestScorer = CONTEXT.parallelize(players)
          .max(new GoalsComparator());


public void should_sample_RDD() {
  List<String> letters = Lists.newArrayList("A","B", "C", "D");

  JavaRDD<String> allLettersRDD = CONTEXT.parallelize(letters);

  List<String> sampledLetters = allLettersRDD.takeSample(true, 2, 3L);

  assertThat(sampledLetters).containsExactly("A", "B");

public void should_correctly_reduce_results_to_check_if_somebody_has_a_debth() {
  List<Integer> numbers = Lists.newArrayList(-1, -2, 3);

  JavaRDD<Integer> numbersRDD = CONTEXT.parallelize(numbers, 1);

  // Reduce applies to all pair of elements held by RDD. It stops to apply reduce function when only
  // one entry is left. The type of result is the same as the type of input parameters.
  // -1 + -2 = -3; -3 + 3
  int result = numbersRDD.reduce((firstValue, secondValue) -> firstValue+secondValue);

public void should_correctly_aggregate_results() {
  List<Integer> numbers = Lists.newArrayList(1, 2, 3, 4);

  JavaRDD<Integer> numbersRDD = CONTEXT.parallelize(numbers, 2);

  // Aggregates values of given RDD. The first value is called 'zero value' and it's used to begin the
  // computation. After, we can find combine function, operating on entries inside RDD's partition. The last
  // function merges results computed on different partitions.
  // Unlike fold(), it can create output of different type than input.
  // 1 + 2 + 3 + 4 = 10
  int result = numbersRDD.aggregate(0,
    (first, second) -> first+second,
    (firstPartition, secondPartition) -> firstPartition + secondPartition);

public void should_correctly_use_fold_for_sum_starting_from_negative_sum_of_numbers_when_slices_are_the_same_as_items_number() {
  List<Integer> numbers = Lists.newArrayList(1, 2, 3, 4);

  JavaRDD<Integer> numbersRDD = CONTEXT.parallelize(numbers, 2);

  // Fold is also a grouping function. By taking one input type, it generates
  // the output of the same type.
  // Spark's fold is suitable for computations made in parallel way. If we expect to calculate
  // fold() on different executors, fold() is able to be called locally on each executor. Local results
  // are further combined to the final one.
  // However, it can't apply to any kind of function. Used function must be commutative. It means that
  // its parameters combinations must give the same results. For example a+b = b+a is a commutative function,
  // but a-b and b-a is not.
  int result = numbersRDD.fold(0, (previousValue, currentValue) -> { 
    return currentValue+previousValue;

  // 1st executor: 0+1 = 1; 1+2 = 3
  // 2nd executor: 0+3 = 3; 3+4 = 7
  // Results merge: 7+3 = 10

public void should_correctly_save_numbers_in_text_file() throws IOException {
  File sparkRDDFile = new File("./test_dir");
  if (sparkRDDFile.exists()) { -> f.delete());
  List<Integer> numbers = Lists.newArrayList(1, 2, 3, 4);

  JavaRDD<Integer> numbersRDD = CONTEXT.parallelize(numbers, 2);


  sparkRDDFile = new File("./test_dir");
  File part1 = new File("./test_dir/part-00000");
  File part2 = new File("./test_dir/part-00001");
  assertThat(Files.readLines(part1, Charset.defaultCharset()).stream().collect(Collectors.joining(", "))).isEqualTo("1, 2");
  assertThat(Files.readLines(part2, Charset.defaultCharset()).stream().collect(Collectors.joining(", "))).isEqualTo("3, 4");
  // Saving output as a file generated N number of content files, N  number of checksum files, where
  // N is the number of partitions (2 in our case). Content files hold data triggered by action. They're prefixed by
  // part-XXXXX. Each of them is accompanied by Cyclic Redudancy Checks file (.crc) which checks if stored data
  // is correct.
  // After, ther are also _SUCCESS and _SUCCESS.crc files. They exist simply to tell that operation of saving
  // data into file terminated correctly.
  // Sample output for our example looks like:
  // > part-00001: [3, 4]
  // > ._SUCCESS.crc: [crc    ]
  // > .part-00001.crc: [crc    ��l]
  // > _SUCCESS: []
  // > .part-00000.crc: [crc    i�xa]
  // > part-00000: [1, 2]

// Comparator must be serializable too. Otherwise the task will fail with the exception
// telling that the Task is not serializable
// More about the reason of this error:
// See also comment on Player object
private static class GoalsComparator implements Comparator<Player>, Serializable {

  public int compare(Player player1, Player player2) {
    if (player1.getGoals() == player2.getGoals()) {
      return 0;
    return player1.getGoals() > player2.getGoals() ? 1 : -1;

private static class StringParam implements  AccumulatorParam<String>, Serializable {
  public String addAccumulator(String t1, String t2) {
    return t1 + "_" + t2;

  public String addInPlace(String r1, String r2) { 
    return r1+r2;

  public String zero(String initialValue) {
    return initialValue;

This post presents available actions in Spark. The first part lists them and shows the use-case examples in pseudo-code. The second part also lists and presents Spark's action. But it does that through Java API.


With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O'Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

