Requests in Apache Kafka

Kafka clients communicate with broker through dedicated TCP connection. They send a lot of different requests, mostly to handle eventual rebalancing.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ πŸ“©

In this article we can discover some main information about this communication. The first part describes mentioned binary protocol used to the connection between client and broker. The second part focuses on different kinds of requests which can be sent.

Kafka protocol

The communication between client and broker is made through binary protocol over TCP. In the code, we can find the tracks of this communication in org.apache.kafka.clients.NetworkClient class and its send(ClientRequest, long) method. The connection to the broker is initiated inside poll() method which is responsible for all writes and reads. Generally, the connection is opened only once, when the first request is sent. Connection state for each broker (1 by broker) is held inside ClusterConnectionStates. The initialization is made by initiateConnect(Node, long) method.

After opening the connection, request is added to a set of in-flight requests. This set represents two types of messages. The first one are the messages already sent but for which client doesn't know the response. The second one are the messages haven't been sent yet. Internally, each set of messages is stored in queue dedicated to given broker. It guarantees that they're processed in order of sending. The processing is still made in previously quoted poll() method. The handling consists on simple reading of some already treated messages. A treated message is an message for which a response was sent by broker to client. Below you can find a code doing that for Kafka 0.9.1:

private void handleCompletedReceives(List responses, long now) {
  for (NetworkReceive receive : this.selector.completedReceives()) {
    String source = receive.source();
    ClientRequest req = inFlightRequests.completeNext(source);
    ResponseHeader header = ResponseHeader.parse(receive.payload());
    // Always expect the response version id to be the same as the request version id
    short apiKey = req.request().header().apiKey();
    short apiVer = req.request().header().apiVersion();
    Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
    correlate(req.request().header(), header);
    if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
        responses.add(new ClientResponse(req, now, false, body));

Kafka requests

Even if Kafka has a lot different request types, it privileges the metadata request over all. In fact, inside the method checking if request can be sent, one check if done on object responsible for handling metadata state. And if metadata needs to be updated, other requests than metadata one are put in pending state until the metadata refresh is done. So if metadata request is sent too often, it risks to add additional latency to calls. You can notice that in following code:

public boolean isReady(Node node, long now) {
  // if we need to update our metadata now declare all requests unready to make metadata requests first
  // priority
  return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());

Metadata requests are really important. They guarantee that client has correct information about cluster and, in consequence, it knows to which broker given message has to be sent. But among that, other different types of requests exist. There are 2 methods to discover them. The first one is the list of API keys in Kafka's documentation protocol page. The second one consists on looking for them in org.apache.kafka.common.requests package. Below you can find which parts are concerned by requests:

  1. metadata - as already told, it concerns cluster metadata, when the information must be updated. Metadata request can be global but it can also concern specific group of consumers. In this last case, we'll talk about group coordinator request.
  2. offset - it can either commits current offset for given partition or get the current offset for given partition from the broker.
  3. heartbeat - is responsible to inform broker that given client is still open.
  4. produce - is used to send new data to the broker.
  5. group management - when one consumer leaves or join a group, requests are sent. The same occurs when there are a need to synchronize group
  6. synchronization between replicas - concerns leader/isr elections or replica removal.

This article dives into Kafka Java API code to investigate the part of requests which are the single communication channel between client and broker. The first part describes which is the logic in the case when client encounters the need to connect to the broker for the first time. Can see that only 1 connection to given broker is open and that all requests transmited to broker are waiting for responses in in-flight requests queue. The second part shows the hierarchy of requests. We can easily note that metadata request is the most important one. In the list, after it, we can find some other requests, concerning: offset, heartbeating, producing, replica/leader synchronization and groups management.

If you liked it, you should read:

πŸ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!