Bulk queries in Elasticsearch

Elasticsearch is devoted to store big amount of data. Making some operations as indexing them can be costly. It's one of the reasons Elasticsearch adopted the same features as in the most of principal RDBMS, batch operations - in Elasticsearch known as bulk operations.

We'll start this article by describing bulk operations available in Elasticsearch through RESTful API. In the second part we'll show how to implement them through the same way.

Bulk operations in Elasticsearch RESTful API

For work with big amount of data, making some thousands or hundreds of calls can have negative aspects on performance. The Elasticsearch response for this problem are bulk operations. They concern 4 types of actions: index, create, delete and update. They all can be mixed in a single call. However, they haven't the same rules of definition:
- index and create: are defined in two lines, separated by new line character. The first line defines index concerned by the operation and the second line the data to create or to index. The difference between them consists on the behavior in the case of duplicates. If we want to create already existent document, the operation will fail.
- update: expects the definition of document to update to be placed in new line, just after the definition of element to update (such as SQL's 'WHERE id = 3').
- delete: contains only one line with the element to delete.

This a little bit strange looking bulk format is well explained in Elasticsearch bulk format documentation. As you can see, it was designed to reduce memory consumption of bulk requests. We can also learn that bulk requests apply to documents which can be located in different primary shards, probably placed in different nodes in the cluster. So, each bulk action must be forwarded to correct shard and node. Thanks to this new-line separated format, Elasticsearch can forward each part of bulk to appropriate place in the cluster. After that, these simple requests are treated in primary shards and replicated if replicate shards are defined.

The result of bulk requests is a single JSON document resuming which operations were correctly executed and which were not. Note that the operations inside bulk have unique scope. It means that the failure in one of several of them doesn't impact the rest.

Bulk RESTful API is applied on URLs which path ends with _bulk word. An example of bulk indexing query looks like:

{"index": {"_index": "waitingforcode", "_type": "teams"}}
{"name": "RC Paris"}
{"index": {"_index": "waitingforcode", "_type": "teams"}}
{"name": "Roubaix"}
{"index": {"_index": "waitingforcode", "_type": "teams"}}
{"name": "Nimes"}

For the URL: http://localhost:9200/waitingforcode/_bulk

Bulk operations in Elasticsearch Java API

In the side of Java's API, bulk operations can be written with fluent API very easily. As other operations, bulk also has an dedicated object - org.elasticsearch.action.bulk.BulkRequestBuilder. It's used to construct bulk request objects. Under-the-hood, it's org.elasticsearch.client.support.AbstractClient's bulk method which is invoked.

Sample bulk indexing requests in Java API can look like that:

public class BulkOperationsTest extends ElasticsearchIntegrationTest {

  @Before
  public void setupCluster() {
    createIndex("teams");
    ensureGreen("teams");
  }

  @Test
  public void test_bulk_index() {
    List<String> teams = 
      Lists.newArrayList("Team_1", "Team_2", "Team_3", "Team_4", "Team_5");
    BulkRequestBuilder bulkRequest = client().prepareBulk();
    for (String team : teams) {
      bulkRequest.add(client().prepareIndex("teams", "team")
        .setSource("name", team));
    }

    BulkResponse bulkResponse = bulkRequest.execute().actionGet();

    Assertions.assertThat(bulkResponse.hasFailures()).isFalse();
    bulkResponse
      .forEach(bulkItemResponse -> Assertions.assertThat(bulkItemResponse.getFailure()).isNull());
    Assertions.assertThat(bulkResponse.getItems()).hasSize(teams.size());
  }
}

Exactly as in the case of RESTful API, bulk operations are quite simple to write thanks to possibility to define each operation separately. The mystery of this simplicity is hidden in org.elasticsearch.action.bulk.BulkRequest class. Delete, index and update requests are stored in its package-private final List field called requests.

Now you can wonder where create requests are stored ? Exactly in the same place because under-the-hood they are considered as objects of the same class as indexing, ie. IndexRequest. The property used to distinguish index query from create query is private field called opType, defined like that: private OpType opType = OpType.INDEX;.

Another way of creating bulk requests from Java API layer consists on use of BulkProcessor. It contains some supplementary control, as deciding the moment of data flush (depending on size, number of actions or time) or how many bulk requests can be executed in parallel.

This article introduced an interesting concept of bulk operations in Elasticsearch. Thanks to them some big operations, as indexing of thousands of documents, or mixed updated executed after one day of activity in a big database, can be executed quicker than in unitary calls. We also explained a little bit what happens in for Java API layer responsible for bulk queries.

If you liked it, you should read: