Recently we discovered some theoretical concepts about Apache Kafka. So it's a good moment to discover Java API.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
This article covers main objects used in Kafka development with its Java API. The first part focuses on actors used to produce messages. The second part describes message consumers. The last part describes common parts for producer and consumer.
Kafka producers
Message producing features are defined in org.apache.kafka.clients.producer package. Let's begin with the most interesting object for messages producing - KafkaProducer. It's very important to underline that the instances of this class are thread-safe, so they can be shared. It should make producing faster than in the case of one instance per thread.
Messages are sent through send(ProducerRecord, Callback) method which can be either blocking or not blocking. In this second case, it returns a java.util.concurrent.Future typed to org.apache.kafka.clients.producer.RecordMetadata. Returned object represents message sent and acknowledged by broker. It contains, among others, the information about offset, topic or partition.
KafkaProducer doesn't necesarilly send messages immediately to the server. Instead, it appends them to an instance of org.apache.kafka.clients.producer.internals.RecordAccumulator. RecordAccumulator is a kind of queue accumulating records before sending them in batch.
Another way to send messages to cluster is flush() method. However, it's a blocking operation because it waits that requests related to sent records complete.
What happens if the delivery of one message fails ? The producer can retry or abort the sending. The behavior depends on configuration entry retries which defines the number of tries after sending failure. In configuration we can find some other interesting entries, such as:
- batch.size - defines the size of buffer containing unsent record for each partition.
- buffer.memory - defines buffer's memory size. If it's exhausted, it means that adding new records will be blocking operation. But in the case of setting block.on.buffer.full to false, this situation will lead to exceptions throwing.
- acks - determines how producer considers messages delivered. The strongest guarantee is all. In this case producer considers message delivered once leader server and its followers acknowledge the record writing. Less stronger value is 1 and it applies only to the case when leader writes record in its own local logs, without waiting for followers acknowledging. If the value is 0, the producer doesn't wait for any acknowledgement.
- bootstrap.servers - list of host:port pairs defining to establish a connection to cluster. This list concerns only initial host but the client can as well connect to other nodes in the cluster, not defined here. In consequence, we can only defined here one server of each cluster, the remaining ones should be discovered automatically.
- linger.ms - specifies, in ms, the delay between request sending. For example, if this value is set to 5, it means that the producer must wait 5 ms before really sending the request with new messages. If the buffer is not already full, incoming messages will be added to final message sent to server - under condition to be pushed before delayed message time. If helps to reduce the number of sent requests.
- key.serializer and value.serializer - classes implementing org.apache.kafka.common.serialization.Serializer interface, used to prepare key and value to be transmitted to server.
Kafka consumers
We retrieve similar concepts to producing in consuming, located in org.apache.kafka.clients.consumer package. Its main component is KafkaConsumer. It contains different methods to read data, either seeking it from particular position (seek* methods) or by polling them through poll(long) method. Polling consists on getting all new messages since the last retrieval. These messages are recognized thanks to last committed offset for given list of partitions.
But consumer can't take any message if it doesn't subscribe to to topics. This actions is made through subscribe(List<String>) method. Subscription is not incremental. It means that each time this method is invoked, previously defined subscriptions are overridden. Consumer can also subscribe to particular topic partitions, represented by org.apache.kafka.common.TopicPartition objects. Unsubscribing is made by a call of unsubscribe() method. Because it doesn't accept any parameter, given consumer unsubscribes from all of previously subscribed topics.
Record retrieved by consumers are represented by instances of org.apache.kafka.clients.consumer.ConsumerRecord class, typed to message key and value.
As you can deduce, offset is an important part of messages reading. It can be handled automatically or manually. If we chose to manage it manually, we can use either synchronous (commitSync()) or asynchronous (commitAsync()) methods. The first operation blocks until the commit is done. The commits concern all subscribed list of topics and partitions.
If you remember from the first article, consumers can be grouped. At least for Java's KafkaConsumer, it's done in configuration step by specifying group.id parameter. In the list of configuration entires we retrieve similar concepts to producer: bootstrap.servers or key/value deserializers (not serializers). Besides that, another entries, specific to consuming, exist:
- enable.auto.commit - enables automatical commit of offsets. The frequency of commit is controled by the value of auto.commit.interval.ms property.
- session.timeout.ms - if given consumer doesn't reply to broker within the delay specified in this entry, it's considered as dead. In this case, the partitions handled by dead consumer will be reassigned to another alive process.
- max.partition.fetch.bytes - the maximum amount of data returned by server. This value applies for each partition. The valoue should be at least as big as the maximum message size allowed by the server.
- fetch.min.bytes - determines the minimum number of bytes that server should return each time. If the server hasn't enough data to return, it will accumulate the data before return it to customer. If set to 1, fetch requests are replied when 1 byte of message is available. When the value is bigger, it can decrease latency but also improve server throughput.
Kafka common objects
org.apache.kafka.common package contains objects which can be used by producer and consumer. Mostly, we can find there objects representing cluster participants, starting with Cluster class to represent a subset of the nodes, topics, and partitions in the Kafka cluster.
Going deeper in architecture, we can get some information about cluster nodes. They are represented by Node class. Further, we can retrieve indicators about given topic and its partition. It's expressed by the instances of already discovered TopicPartition class. If we want to get more details, we can use PartitionInfo. It contains more details, mostly about topic organisation in the cluster. So, we can retrieve: leader node, followers (both alive and not used anymore), topic name and partition id.
Kafka Java API is quite small and contains only the most evident objects representing messaging concepts: producers, consumers, cluster components and serializers. But the real discovery of them will be the subject of next articles which, as usually until now, will mix theoretical and practical approaches.