Zkafka by Zillow: Go Library for Simplifying Kafka Consumption

Zkafka is a go module written by Zillow and used internally by about 100 services. The library aims to be a valuable and straightforward building block for Apache Kafka consumption. It ships with a complete feature set and introduces higher-level primitives on top of bare metal Kafka consumers. Some of the notable features include:
The above list introduces much jargon with little context. However, reviewing the “why” behind this project is helpful before fully detailing the feature set.
In the past five years, Kafka use at Zillow has become more common. The internal Kafka Cluster has more than 1,500 Kafka Topics currently provisioned. As Kafka’s use expanded, teams began to author bespoke consumption solutions. Teams whose language of choice was Java had a rich set of Java frameworks built up in the Kafka Ecosystem. A common choice for these teams was Flink, which is a framework for stateful message processing. Zillow codebase is a polyglot web of microservices, and for teams not using Java, their choices were fewer and further between. The most popular languages for application development were Go, Python, and JS. For these tech stacks, bespoke Kafka consumer solutions built with direct use of HighLevel consumers began to become commonplace. This led to a proliferation of solutions, each with its own set of challenges and limitations:
Uber documents a similar problem space and its solution in its Kafka Consumer Proxy post.
Consumer Proxy’s solution, as detailed in Uber’s post, primarily focuses on improving performance and simplifying consumption semantics. It achieves this by introducing Consumer Proxy, which liaises between would-be consumers and Kafka. The abstraction layer introduces a seam by which semantic simplifications and performance gains can be attained.
The shortcomings can categorically be summarized as issues with performance and usability. The following sections will detail those issues, some of which were similarly observed and noted in the Uber post.
The performance issues in this section aren’t limitations of Kafka as a technology. However, there are limitations incurred by typical consumer pattern implementations against Kafka.
Fungible replicas make up a service deployment. In the Kafka world, each replica becomes a member of the consumer group (a processor). For the sake of this example, say that count is 2. Kafka consumer group semantics will roughly evenly split the topic’s partitions amongst the replicas. To continue with the example, assume there are four partitions:
Processor Count = 2
Partition Count = 4
A curious reader can arrange this setup in the Kafka Topic VisualizationTool.
The typical recommendation is to increase throughput simply by increasing the consumer count. However, this strategy has an upper limit. That’s because a single partition can be assigned to, at most, one consumer group member simultaneously. Unfortunately, this caps the number of replicas that may do meaningful work.
Additionally, a replica processing a single message at a time doesn’t efficiently use the underlying compute resources. A workload with an IO-bound code path will idly waste its compute allocation and have no opportunity to schedule meaningful work. Kubernetes can combat this by allocating portions of virtual CPUs and minuscule memory. However, a Kubernetes replica (in Zillow’s case, an executing OS process) is comparatively heavyweight. Consider the following example.
Assume a replica has a minimum memory footprint of 40MiB; thus, two replicas would be 80 MiB, and so on. Alternatively, a single goroutine has a minimum size of 2 KB (1000 orders of magnitude smaller). Figure 1 shows that though both scaling strategies are linear, the constants of replica-only scaling are much higher. Utilizing concurrency within a replica better amortizes the overhead of running a Go process within a replica.
Note: The back envelope math is liberal in its assumption that the second goroutine’s size is vanishingly small and non-growing. In reality, its memory footprint would increase. The inference that a goroutine is lighter weight, comparatively to a Go process, remains valid (See Why is goroutine being called a “lightweight” thread?)
Figure 1: Memory consumption increases in the face of increased concurrency by way of replica and goroutine scaling
Imagine you’re at your local Publix (or any grocery store) and there are two checkout lines. The lines are balanced initially, with four patrons
in each. Unfortunately, for line 1, the first patron insists on paying by check, and they’re struggling to find their pen. Suffice it to say line 2 is efficiently processed and is now empty, but line 1 remains stubbornly motionless. This is head of line blocking.
The metaphor is somewhat strained, but roughly, this arrangement can be mapped to the following Kafka Topic/Consumer Group arrangement:
A typical, but conservative, pattern for kafka consumers wary of data loss is to process a single message, commit the message, and then proceed to the following message. This pattern is simple and allows for guaranteed at-least-once-processing. Publix has decided to use this strategy in the contrived example. However, as the example demonstrates, the cashier of line 2 finishes and then becomes ineffectual (compute underutilization), and the patrons after patron 1 in line 1 are grumpy because of the delay (poor poison pill isolation).
Kafka comes with a steep learning curve. The primitives introduced by Kafka include:
The above list is incomplete but demonstrates comparative complexity, especially compared to other technologies like AWS’s SQS.
The complexity enables a single piece of infrastructure to serve many use cases. Still, it forces developers to reason about details far from the business rules they implement.
Consider the case where a dev wants to use Kafka for a greedy consumer pattern. The greedy consumer pattern implies queue semantics, a task a Kafka topic is well suited for. However, the developer must now reason about Kafka consumer semantics.
Those questions are shown without answers to illustrate some of the considerations/tradeoffs developers must consider when interacting with Kafka.
Dead lettering is a convenient and powerful technique for asynchronous message processing. A bare Kafka cluster doesn’t support dead lettering as a first-class citizen. Service implementers must provision the topic and adopt the pattern of writing to the topic in the presence of message processing failure. Though conceptually simple, the one-off code for dead letter delivery raises the barrier to entry just enough that, in practice, many systems at Zillow went without.
Retry pipelines extend the concept of dead lettering, and similar to dead lettering, there is no out-of-the-box primitive that enables the idea. The pattern is to introduce a tiered retry pipeline where delays between progressive retries are significant (typically many orders of magnitude larger than the processing duration).
Figure 2: Retry Pipeline demonstrating contingency execution (on delay) in the face of failure
Conceptually, a retry pipeline takes the form shown in Figure 2. A replica processes a message. If processing fails, it’s processed again (after some considerable delay). The idea here is that some transient errors (like Database failover from primary to a reader) have a longer duration than typical inline retry delay can mask. After N failures (3 in the above diagram), the failed message persists in a dead letter mechanism (a queue, topic, or database), and an operator can make decisions about recourse.
The strategy can help mitigate operator interventions required to evaluate and execute a course of action for a dead-lettered message.
Zkafka was developed to address these issues and provide a unified, high-performance, easy-to-use solution for Kafka consumption. Zkafka is a Kafka library that addresses all the above problems with dead simple semantics. Kafka itself is complex, but zkafka aims to address the issues above transparently, leaving the code author responsible for writing only the function executed per read message. All the details about polling for messages, efficiently using the underlying compute, extending the Kafka ordering semantics, safely shutting down, and handling rebalances caused by failures/consumer member additions/ revocations are handled. Zkafka taking on this complexity allows the developer to be principally focused on the code directly relevant to the task.
Uber’s post details a service implementation that aims to address many of the same issues. Zillow pursued the more straightforward library solution, as opposed to distributed service, for the following reasons:
Alternatively, the library is divorced from many of these complexities. It can address the issues without complex persistence requirements, instead acting only on the local state, increasing simplicity and benefiting testability.
ZKafka is a library with many great features to be further detailed in the upcoming sections. It aims to efficiently use underlying compute resources by introducing safe concurrency beyond Kafka’s physical partitions (without weakening ordering guarantees). It achieves this by implementing work ahead processing and virtual partitions. Moreover, it aims to achieve a higher-level feature set than bare metal Kafka can with Configurable Dead Lettering and Composable Retry Pipelines.
A message read from a Kafka topic has an offset and partition, both integers. A consuming service may commit a message after processing concludes. “Committing message” means saving the offset with the Kafka cluster. The Kafka cluster tracks committed offsets for every partition per consumer group.
The last commit is the starting point when the consumer is assigned a set of partitions. This assignment happens when a consumer joins or a member consumer is evicted. Typical scenarios where this occurs include:
A simple commit strategy would be to commit after each message is processed. However, the following example will show that this poses a risk of data loss.
Consider the case where a topic has one partition and two messages are processed concurrently. Figure 3 shows a timelapse of this strategy progressing from t=0 -> t=4.
t=0) Processing of msg0 and msg1 begins
t=1) Processing of msg1 completed and is committed** advancing the offset just beyond msg1.
t=2) Processing of msg2 begins
t=3) Processing of msg0 is completed and is committed**. This commit is ineffectual (lower offset commit).
t=4) Processing of msg2 is completed and msg2 is committed**
The commit at t=1 represents a data loss opportunity. If before the completion of msg0 processing after t=1 but before t=2, the replica was torn down, msg0 would never be reread. When the partition gets reassigned to another replica, it will start reading at msg2 (since the commit acts as a high watermark used to start where work left off). Starting at mg2 constitutes data loss.
Figure 3: A timelapse of concurrent processing of a single partition. Initially, the commit is just before msg0
Zkafka handles this scenario by keeping track of inflight work and marking messages as completed when processing concludes. Zkafka executes commits only when all previous messages have been marked as completed. Figure 4 shows this concept.
t=0) Processing of msg0 and msg1 begins
t=1) Processing of msg1 is finished and msg1 is marked as completed
t=2) Processing of msg2 begins
t=3) Processing of msg0 is finished and msg0 is marked as completed. A commit** is executed, advancing the offset to just beyond msg1
t=4) Processing of msg2 if finished and msg2 is marked as completed, and a commit** is executed.
Figure 4: A timelapse of processing a single partition. Initially, the commit is just before msg0
**A commit, as described, is more accurately described as a logical commit. It doesn’t incur a network call. Instead, a local store is updated, and a background process communicates with the broker at set intervals and rebalance events. The strategy is detailed in zkafka’s readme.
Virtual partitions are a mechanism that allows for concurrent processes, without dismantling the ordering guarantees of Kafka partition. Virtual Partitions are most easily explained by looking at a natural progression for introducing concurrency.
Kafka has the concept of consumer groups. It’s the basic primitive for horizontal scaling processing. Topics are defined a priori, with N partitions. At Zillow, we deploy our workloads to a Kubernetes cluster and leverage Kubernetes Deployments to control the number of replicas via configuration.
The example below shows a replica processing messages from a single topic with two partitions. Kafka can maintain its ordering guarantees (per partition ordering) at the sacrifice of throughput.
Figure 5: A single replica with a single processor processing messages from a topic with two partitions. A video rendering of the same process is available on YouTube (single message processor).
The easiest way to scale and increase throughput is to use Kafka consumer group semantics in conjunction with Kubernetes deployment replica sets. This example increases the replica count to 2.
Figure 6: Two replicas with a single processor processing messages from a topic with two partitions. A video rendering of this diagram is available on YouTube (multiple replicas, single message).
Throughput has increased. However, the replica could be better utilizing its underlying compute resources. In IO-bound workloads, the CPU sits idle for large spans and has no opportunity to schedule other messages on the CPU concurrently. Would it be possible to introduce concurrency within a replica?
In Go, concurrency can come from a goroutine pool.
Figure 7: one replica with two processors processing messages from a topic with two partitions. Processor selection is based on availability only. A video rendering of this diagram is available on YouTube (single replica multi-processor round robin).
Figure 7 shows a replica processing a topic as a time series. The series of events is as follows:
This uncoordinated concurrency gives up on utilizing Kafka’s per-partition ordering, as there’s no guarantee that P1-2 will be processed after P1-1. This would be unfortunate because Kafka’s semantics offer a valuable seam for workload isolation in the previous iterations. This begs the question, is there a way to have our cake and eat it too?
What zkafka does is introduce the goroutine pool with a goroutine in the middle, acting as a router. The router selects a gouroutine from the processor pool using a hash mod N strategy. The selection process is similar to that of the Kafka producer partition selector. The algorithm has the following signature (pseudocode) where the response is an integer index of an assignable processor goroutine.
A description of the timelapse is shown in Figure 8 below.
Figure 8: one replica with two processors processing messages from a topic with two partitions. Processor selection is based on the message key. A video rendering of this diagram is available on YouTube (single replica, multi-processor with virtual partitions)
Processing a message can fail for several reasons, some of which are:
What should be done when any of the above occurs? At Zillow, many services are built with built-in idempotency (see AWS’s safe retries with idempotent APIs for a detailed discussion), which allows for safe retries. This strategy addresses (2), though it doesn’t eliminate it. Regardless, (1) and (3) and the catch-all (4) still exist. So, ultimately, failures will occur that automated retries can’t paper away. A typical strategy in async message processing is for some mechanism to dead letter messages that failed to execute successfully. Subsequently, a human assesses and determines remediation steps.
Dead lettering is a baked-in functionality for other queuing technologies, namely SQS. Though conceptually simple, it does add a fair amount of boilerplate within the Kafka world. One of zkafka’s primary motivations was simplifying consumption semantics for developers. The bias to simplify extends to failure scenarios. Zkafka introduces a configuration-based solution for dead letters. In this way, adding dead lettering to a worker is as simple as providing the topic name of the dead letter to target in case of a failed message.
The following code snippet shows the additional lines required by such a configuration.
Under the hood, zkafka exposes rich lifecycle/callback functionality through its APIs. The dead letter configuration is syntactic sugar for registering a special onDone callback, which is guaranteed to be processed after the message processing conclusion (either message processing success, failure, or timeout), and is codified to write to the configured Dead Letter Topic in case of failure or timeout.
A typical pattern is configuring a service to retry processing a message after an extended delay. SQS can achieve inter-process retry using the concept of Visibility Timeout. In practice, the extended delay can be orders of magnitude larger than the typical processing duration of a message. Large retry delays are a valuable property for addressing extended transient errors.
A developer could achieve a processing delay by adding a delay within the processor callback (the code they principally authored). However, an implementation like this presents a couple of disadvantages:
To address both these issues, zkafka has a concept of processor delay. Processor Delay was designed as a configurable option in a Kafka worker to enable a stage in a retry pipeline to execute on delay.
Figure 9: Retry Pipeline shows multiple topics daisy chained together by logically identical workers
Tactically, a system designer achieves the design objectives of Figure 9 by deploying N workers with the same processor callback (differing only in their source topic, dead letter topic, and processing delay).
Zkafka is open-source and available on GitHub under an Apache 2.0 license. The package is well documented and has plenty of runnable examples, and the zkafka readme gives details about running an example (including the simple setup phase).
For questions about the library or working at Zillow, contact stewartb@zillowgroup.com. I look forward to hearing from you!