Consumers in Apache Kafka

on waitingforcode.com

Consumers in Apache Kafka

When you need to write about messaging, you'll certainly meet a dilemma about 'which part, consuming or producing, describe first ?'. I decided to start with simple consumers. By 'simple', I mean only 1 consumer per group. Thanks to that we can avoid the point of partition sharing.

As you can see, this article covers some main points about Kafka consumers. The first part presents some of them: pulling, offset and consuming semantics. The second part describes several configuration entries. The last one, through a good ten or so test cases, shows how to use consumers with Kafka's Java API. Kafka still keeps the concept of old consumers from 0.8.0 version but in our case, we'll focus on 'new' ones, introduced in 0.9.0.

Kafka consumer components

A very important point regarding consumers concerns the way of retrieving the messages. Kafka consumers are pulling messages. It means that they expressively show the intention to get message. Unlike in pushing, in pulling the consumers decide when and which messages will be downloaded. In the other side, in pushing it's the server which sends messages. Thanks to this approach, Kafka consumers are quite independent of messages context. For example, if there are a lot of pushed messages during one specific day (maybe because of discounts), consumers won't be overloaded because they'll probably keep the same pulling policy. In additional, pulling makes the sending of batch messages easier. The server doesn't need to know which messages must be sent in single call. Instead, it's the consumer which handles that aspect, in Kafka through partition offsets.

Offsets are another key component of Kafka consumers. As already told, Kafka topics are divided into a set of ordered partitions. When one consumer wants to read messages from one topic, it can explicitly specify from which partition it wants to get the messages. It can also subscribe topic in general manner, by specifying its name. By doing so, it's Kafka which will decide the reading partition. The offset is a simple integer pointing to the next message to consume. It's not a fixed and immutable value because it can be set to any positive integer - even to 0 if the consumer needs to reconsume messages once again.

As you can see, offset is also a kind of acknowledgement known from other messaging systems as RabbitMQ. It's stored on ZooKeeper topic called __consumer_offsets and can be synchronized automatically or manually. Automatic configuration means that consumer will send to ZooKeeper the offset of already consumed messages (by default, every 5 seconds). When manual committing is preferred, consumer will need to commit offset manually through available API methods.

Consumer can work with 1 of 3 message delivery semantics:

  1. At most once - message can be consumed at most once. That said, some of messages can not be consumed at all (no redelivery). This situation can happen when a consumer commits logs before consuming the message:
    • Messages [1, 2, 3] are correctly got from pull request
    • Consumer commits its new offset to 4 (the "id" of next processed message)
    • Unfortunately, the first retrieved message causes a NullPointerException and because of bad code design, the consuming of all 3 messages is aborted
    • Consumer pulls again and it gets next 3 messages: [4, 5, 6] - messages [1, 2, 3] are not consumed
  2. At least once - message can be consumed at least once. That said, some of message can be consumed twice or more in the case of redelivery. This situation can happened when consumer consume messages correctly but fails to commit log:
    • Messages [1, 2, 3] are correctly got from pull request
    • Consumer correctly treats all 3 messages and makes some persistent output, for example: inserts new rows into database
    • Too bad, offset commit fails
    • Consumer pulls again and regarding to the last committed offset (1), it retrieves once again the messages [1, 2, 3] - they're consumed twice with bad code design
  3. Exactly once - message is consumed once and only once. This guarantee can be introduced with the help of external storage systems:
    • Messages [1, 2, 3] are correctly got from pull request
    • Consumer correctly treats all 3 messages
    • Before committing the offset, the consumer saves it to external place, for example a database
    • If everything goes well until now, the consumer commits the offset. Now even if something goes wrong in this step, it knows, thanks to external storage, which messages were really proceeded

    As you can see, this approach is the safest but it also introduces some latencies caused by the communication with external storage.

Consumer configuration

The goal of this part is not to cover all available configuration options. Instead, it lists only the ones which risk to be used during the first use cases:

  • enable.auto.commit - binary true/false entry. If true, it means that consumer will commit offset automatically every X seconds. X is the value specified in another configuration entry - auto.commit.interval.ms
  • client.id - the id of the consumer, very useful for debugging.
  • auto.offset.reset - defines consumer behavior when it hasn't consumed messages yet (ie. offset is not defined). If we use 'earliest' as value, the consumer will get the earliest messages. In the other side, 'latest', will pick only messages produced after consumer's creation.
  • max.partition.fetch.bytes - the maximum size of fetched message by partition.
  • fetch.min.bytes - helps to tune consumer fetching. It defines the minimal amount of bytes which can be fetched by consumer. So for example, we've only messages of 5 bytes. Our consumer expects to fetch at least 10 bytes. If server has only 1 message, it won't sent it to consumer. Only when the 2nd message is producer (server has 10 bytes), it'll be able to deliver it to consumer. This behavior can be overridden by the property named fetch.max.wait.ms. Let's back to our case - the consumer has only 1 message and the max waiting time is of 1 second. If there are no new messages coming within 1 second, the server will return this single message to consumer.
  • key.deserializer and value.deserializer - defines classes used during key and value deserialization.

Single consumer example

This part shows some test cases with the use of Kafka consumer. Before launch each test, it's mandatory recreate a topic with given commands:

bin/kafka-topics.sh --delete --zookeeper localhost:2181  --topic singleconsumertopic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic singleconsumertopic

Test cases show previously described aspects: partition assignment, offset commitment, fetching size and some other ones:

@Test
public void should_correctly_subscribe_even_if_kafka_broker_is_down() throws IOException, InterruptedException {
  // Consumers subscribe() to Kafka topic even if the broker is down
  // More sure method about subscribing for a single consumer is assign() because to use that we need
  // to know exact information about assigned partitions while for subscribe() simple name is enough
  System.out.println("Turn off your Kafka broker now - you've 10 seconds from now to do that");
  Thread.sleep(10000);
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, "test1_"));
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
  localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));

  assertThat(localConsumer.subscription()).hasSize(1);
  assertThat(localConsumer.subscription()).containsOnly(TOPIC_NAME);
}

@Test
public void should_block_consumer_thread_when_poll_is_invoked_and_broker_is_down() throws IOException, InterruptedException {
  System.out.println("Turn off your Kafka broker now - you've 10 seconds from now to do that");
  Thread.sleep(10_000);
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, "test2_"));
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
  localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));

  /**
    * When broker is down, poll() will never timeout. To show that,
    * we must manage it in separated thread.
    */
  Collection<Integer> polledMessages = new ArrayList<>();
  new Thread(() -> {
    int i = 0;
    // poll() returns never null
    // If poll() wouldn't block, it should increment
    // it should increment counter 6 times (3000/500)
    while (localConsumer.poll(500) != null) {
      polledMessages.add(i++);
    }
  }).start();
  Thread.sleep(3_000);

  assertThat(polledMessages).isEmpty();
}

@Test
public void should_correctly_connect_to_partition_even_if_broker_is_down() throws InterruptedException, IOException {
  System.out.println("Turn off your Kafka broker now - you've 10 seconds from now to do that");
  Thread.sleep(10_000);
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, "test3_"));
  KafkaConsumer<String, String> localConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
  // Even if broker is down, consumer correctly assign partition
  // and do not block, unlike with poll(), when it seeks to
  // particular offset
  TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);
  localConsumer.assign(Collections.singletonList(partition0));
  localConsumer.seek(partition0, 0L);

  assertThat(true).isTrue();
}

@Test
public void should_correctly_poll_messages() throws InterruptedException, IOException, KeeperException {
  String testName = "test4_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  // earliest - we want that the consumer gets all messages of given topic, from the beginning
  consumerProps.setProperty("auto.offset.reset", "earliest");
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
  // It is also possible for the consumer to manually
  // specify the partitions that are assigned to it through assign(List), which disables this dynamic partition assignment.
  localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
  try {
    KafkaProducer<String, String> sampleProducer = getProducer(testName);
    // Send messages
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, "1", "A"));
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, "2", "B"));

    // Wait for messages being flushed to the broker (can be flushed manually with flush() method)
    sampleProducer.flush();

    // Get previously flushed messages
    // Be sure that the topic is empty before testing. Since we get the earliest messages
    // we could risk to retrieve more than the 2 first
    ConsumerRecords<String, String> records = localConsumer.poll(5_000);

    assertThat(records.count()).isEqualTo(2);
    assertThat(records.records(TOPIC_NAME)).extracting("key").containsOnly("1", "2");
    assertThat(records.records(TOPIC_NAME)).extracting("value").containsOnly("A", "B");
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_correctly_assign_a_partition_to_consumer() throws IOException, InterruptedException {
  String testName = "test5_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));

  try {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    PartitionsStoringRebalanceListener rebalanceListener = new PartitionsStoringRebalanceListener(countDownLatch);
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME), rebalanceListener);
    countDownLatch.await(4, TimeUnit.SECONDS);
    // Partitions assignment is lazy, when poll() is not invoked, partitions are not assigned
    assertThat(rebalanceListener.getAssignedPartitions()).isEmpty();

    // Let's invoke poll() to force assignment
    localConsumer.poll(3_000);

    assertThat(rebalanceListener.getAssignedPartitions()).hasSize(1);
    assertThat(rebalanceListener.getAssignedPartitions()).extracting("partition").containsOnly(0);
    assertThat(rebalanceListener.getAssignedPartitions()).extracting("topic").containsOnly(TOPIC_NAME);
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_read_the_same_records_twice_because_of_not_committed_offset() throws IOException {
  String testName = "test6_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  consumerProps.setProperty("auto.offset.reset", "earliest");
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));

  try {
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    // Produce some sample messages
    KafkaProducer<String, String> sampleProducer = getProducer(testName);
    String key1 = "1"+ System.currentTimeMillis(), key2 = "2"+System.currentTimeMillis();
    String value1 = "A"+System.currentTimeMillis(), value2 = "B"+ System.currentTimeMillis();
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key1, value1));
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key2, value2));
    sampleProducer.flush();

    // poll for the first time
    ConsumerRecords<String, String> records = localConsumer.poll(3_000);

    assertThat(records.count()).isEqualTo(2);
    assertThat(records).extracting("key").containsOnly(key1, key2);
    assertThat(records).extracting("value").containsOnly(value1, value2);

    // No offset was committed, poll() once again without producing new messages
    // We expect to get the same messages
    // To illustrate this desynchronization, we reinitialize our consumer
    localConsumer.close();
    localConsumer =
            new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    // Check if polled() records are the same
    records = localConsumer.poll(3_000);
    assertThat(records.count()).isEqualTo(2);
    assertThat(records).extracting("key").containsOnly(key1, key2);
    assertThat(records).extracting("value").containsOnly(value1, value2);
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_records_only_once_since_offset_is_committed_manually_before_consumer_reinitialization() throws IOException {
  String testName = "test7_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  consumerProps.setProperty("auto.offset.reset", "earliest");
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));

  try {
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    // Produce some sample messages
    KafkaProducer<String, String> sampleProducer = getProducer(testName);
    String key1 = "1"+ System.currentTimeMillis(), key2 = "2"+System.currentTimeMillis();
    String value1 = "A"+System.currentTimeMillis(), value2 = "B"+ System.currentTimeMillis();
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key1, value1));
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key2, value2));
    sampleProducer.flush();

    // poll for the first time
    ConsumerRecords<String, String> records = localConsumer.poll(3_000);

    assertThat(records.count()).isEqualTo(2);
    assertThat(records).extracting("key").containsOnly(key1, key2);
    assertThat(records).extracting("value").containsOnly(value1, value2);

    // This time we commit offset before closing previous consumer. It's expected
    // that it starts to consumer only new messages and because there are no
    // new ones, it should consume nothing
    localConsumer.commitSync();
    localConsumer.close();
    localConsumer =
            new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    records = localConsumer.poll(3_000);
    assertThat(records.count()).isEqualTo(0);
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_correctly_read_one_record_once_and_at_most_once() throws IOException, InterruptedException {
  String testName = "test8_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  consumerProps.setProperty("auto.offset.reset", "earliest");
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));

  long alreadyConsumedOffset = 0L;
  try {
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    // Produce some sample messages
    KafkaProducer<String, String> sampleProducer = getProducer(testName);
    String key1 = "1"+ System.currentTimeMillis(), key2 = "2"+System.currentTimeMillis();
    String value1 = "A"+System.currentTimeMillis(), value2 = "B"+ System.currentTimeMillis();
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key1, value1));
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key2, value2));
    sampleProducer.flush();

    // poll for the first time
    ConsumerRecords<String, String> records = localConsumer.poll(3_000);

    assertThat(records.count()).isEqualTo(2);
    assertThat(records).extracting("key").containsOnly(key1, key2);
    assertThat(records).extracting("value").containsOnly(value1, value2);
    alreadyConsumedOffset = 2;

    // Expressly we do not commit offset. But even with that, we won't consume
    // messages twice because of external synchronization "mechanism"
    // We expect only 1 partition to be assigned
    // A better example with ConsumerRebalanceListener can be found in
    // early release of "Kafka - The Definitive Guide", look for "SaveOffsetsOnRebalance"  word
    TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);
    localConsumer.close();
    localConsumer =
            new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
    localConsumer.assign(Collections.singletonList(partition0));
    localConsumer.seek(partition0, alreadyConsumedOffset);
    records = localConsumer.poll(3_000);
    assertThat(records.count()).isEqualTo(0);
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_fail_on_seeking_because_subscription_is_lazy() throws IOException {
  // It's almost the same test as the previous one. The difference is that it doesn't use
  // assign() but subscribe() in the reinitialized consumer. By seeking() to particular offset
  // just after, the test is expected to fail because partition is assigned only on poll()
  // for the case of subscribe()
  String testName = "test9_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  consumerProps.setProperty("auto.offset.reset", "earliest");
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));

  long alreadyConsumedOffset = 0L;
  try {
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    // Produce some sample messages
    KafkaProducer<String, String> sampleProducer = getProducer(testName);
    String key1 = "1" + System.currentTimeMillis(), key2 = "2" + System.currentTimeMillis();
    String value1 = "A" + System.currentTimeMillis(), value2 = "B" + System.currentTimeMillis();
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key1, value1));
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, key2, value2));
    sampleProducer.flush();

    // poll for the first time
    ConsumerRecords<String, String> records = localConsumer.poll(5_000);

    assertThat(records.count()).isEqualTo(2);
    assertThat(records).extracting("key").containsOnly(key1, key2);
    assertThat(records).extracting("value").containsOnly(value1, value2);
    alreadyConsumedOffset = 2;

    // It should fail right here because subscribe() is lazy evaluated only
    // on poll() invocation. Here, poll() is made after seek() because
    // we want to control offset by ourselves (not a good idea, just for
    // illustration purpose)
    TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);
    localConsumer.close();
    localConsumer =
            new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    localConsumer.seek(partition0, alreadyConsumedOffset);
    localConsumer.poll(3_000);
    fail("Should fail on seeking to partition when this one is not assigned");
  } catch (IllegalStateException ise) {
    assertThat(ise.getMessage()).isEqualTo("No current assignment for partition singleconsumertopic-0");
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_wake_up_consumer_polling_too_long_without_getting_messages() throws IOException {
  String testName = "test10_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, true, null));

  Collection<Integer> synchronizer = new ArrayList<>();
  new Thread(() -> {
    System.out.println("Waiting 5 seconds before stopping consumer and quit");
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    synchronizer.add(1);
    localConsumer.wakeup();
  }).start();
  localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));

  boolean awaken = false;
  System.out.println("before poll");
  try {
    while (synchronizer.isEmpty()) {
      localConsumer.poll(250);
    }
    System.out.println("released");
  } catch (WakeupException we) {
    // WakeupException is expected behaviour when working with wakeup() method
    // So we can do nothing
    awaken = true;
  }

  assertThat(awaken).isTrue();
}

@Test
public void should_fail_on_fetching_too_big_message() throws IOException {
  String testName = "test11_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  // only 1-byte message can be fetched
  consumerProps.setProperty("max.partition.fetch.bytes", "1");
  // use earliest to be sure to consume message sent by producer
  consumerProps.setProperty("auto.offset.reset", "earliest");
  KafkaConsumer<String, String> localConsumer =
          new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, false, null));

  KafkaProducer<String, String> sampleProducer = getProducer(testName);
  sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, "12", "34"));
  sampleProducer.flush();

  // Since we set max fetch size to 1 byte, getting previously flushed message
  // should fail
  localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
  try {
      localConsumer.poll(15_000);
      fail("Should fail on getting to big message");
  } catch (RecordTooLargeException rtle) {
      // Got message explains well the problem. It should look like:
      // RecordTooLargeException: There are some messages at [Partition=Offset]:
      // {singleconsumertopic-0=0} whose size is larger than the fetch size 1 and hence cannot be ever returned.
      // Increase the fetch size, or decrease the maximum message size the broker will allow.
  }
} 

@Test
public void should_reset_to_original_offset_when_demanded_offset_is_out_of_scope() throws IOException, InterruptedException, ExecutionException {
  String testName = "test13_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  KafkaConsumer<String, String> localConsumer =
      new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, true, null));
  try {
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    localConsumer.poll(2_500);
    // First, we check which offset was defined before bad seek
    long initialOffset = localConsumer.position(new TopicPartition(TOPIC_NAME, 0));

    // Now, we try to define an out of range seek
    // It shouldn't be take into account
    // According to logs, when expected offset doesn't match the real situation,
    // it's automatically reset :
    // <code>
    // Added fetch request for partition singleconsumertopic-0 at offset 0 (org.apache.kafka.clients.consumer.internals.Fetcher:519)
    // Seeking to offset 10000 for partition singleconsumertopic-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1043)
    // Discarding fetch response for partition singleconsumertopic-0 since its offset 0 does not match the expected offset 10000 (org.apache.kafka.clients.consumer.internals.Fetcher:554)
    // Added fetch request for partition singleconsumertopic-0 at offset 10000 (org.apache.kafka.clients.consumer.internals.Fetcher:519)
    // Fetch offset 10000 is out of range, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:595)
    // Resetting offset for partition singleconsumertopic-0 to latest offset. (org.apache.kafka.clients.consumer.internals.Fetcher:290)
    // Fetched offset 0 for partition singleconsumertopic-0 (org.apache.kafka.clients.consumer.internals.Fetcher:483)
    // Added fetch request for partition singleconsumertopic-0 at offset 0 (org.apache.kafka.clients.consumer.internals.Fetcher:519)
    // </code>
    localConsumer.seek(new TopicPartition(TOPIC_NAME, 0), 10000);
    localConsumer.poll(2_500);
    long outOfRangeOffset = localConsumer.position(new TopicPartition(TOPIC_NAME, 0));

    assertThat(outOfRangeOffset).isEqualTo(initialOffset);
  } finally {
    localConsumer.close();
  }
}

@Test
public void should_pause_and_resume_polling() throws IOException {
  String testName = "test14_";
  Properties consumerProps = Context.getInstance().getCommonProperties();
  consumerProps.setProperty("client.id", ConsumerHelper.generateName(TOPIC_NAME, testName));
  KafkaConsumer<String, String> localConsumer =
    new KafkaConsumer<>(ConsumerHelper.decoratePropertiesWithDefaults(consumerProps, true, null));
  try {
    localConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
    localConsumer.poll(2_500);

    KafkaProducer<String, String> sampleProducer = getProducer(testName);
    sampleProducer.send(new ProducerRecord<>(TOPIC_NAME, "12", "34"));
    sampleProducer.flush();

    TopicPartition partition = new TopicPartition(TOPIC_NAME, 0);
    localConsumer.pause(partition);
    // This poll() should return nothing
    ConsumerRecords<String, String> records = localConsumer.poll(2_500);
    assertThat(records.count()).isEqualTo(0);

    // We activate polling
    localConsumer.resume(partition);
    records = localConsumer.poll(2_500);
    assertThat(records.count()).isEqualTo(1);
  } finally {
    localConsumer.close();
  }
}

This article shows Kafka's single consumer (1 consumer by group). The first part describes key points which should help to understand how consumers are working. We retrieve there, among others, messaging retrieval approach (pulling), offset definition and messages delivery strategies. The second part lists configuration points which can be surely changed during the first tests with Kafka. The last part shows how to implement consumers with the use of Kafka Java API.

Share on: