Introduction To ZooKeeper
A practical tour of ZooKeeper’s data model, common recipes (locks, barriers, group membership), and the internals (ZAB, snapshots) that make it reliable.
Large-scale distributed systems require reliable coordination services. Common examples include:
- Configuration management: maintaining and distributing operational parameters across processes.
- Group membership and leader election: detecting which processes are alive and assigning roles.
- Locks and synchronization: ensuring mutually exclusive access to critical resources.
While specialized systems exist, for example, Amazon SQS for messaging, or Google Chubby for distributed locks and naming, these are tailored to specific use cases. What was missing was a general-purpose coordination service that could flexibly support multiple patterns without forcing developers to depend on separate, narrow-purpose systems.
ZooKeeper was designed to be this coordination kernel: a simple, high-performance service with a minimal set of primitives (znode, ephemeral and sequential nodes, versioned writes, and watches - don’t panic, we’ll go into detail right away!). From these, developers can build higher-level abstractions, locks, queues, barriers, leader election, directly in their applications, while ZooKeeper provides the consistency and reliability guarantees underneath.
- NOT strong consistency: Reads are not linearizable and can return stale value.
- Guarantee FIFO client ordering of all operations and linearizable writes → sufficient to implement coordination primitives of interest of clients.
- Using watch mechanism to invalidate cache from client
- Better performance than Chubby
Chubby recap:
- All read and write requests must route to leader
- Provide blocking primitives (via
acquireLock) - Even extensive use of caching, Chubby still guarantees strong consistency: Cache from clients must be invalidated (through notification) / cleared out (due to lease timeout) before any update → updates are blocked until cache invalidation is done → This make the write throughput of Chubby is very slow, even though it’s expected
Comparison:
- ZooKeeper doesn’t block update to invalidate the caches of all clients → wait-free → better performance
- Client: user of ZooKeeper service
- Server: Process providing the ZooKeeper service
znode: in-memory data node in the ZooKeeper data (will go into detail later)- update/write: any operation that modifies the state of data tree
zxid: ZooKeeper’s global transaction ID. It’s a unique 64-bit that totally orders updates across the whole system. The high 32 bits are the epoch (associated with the leader, it’s changed when a new leaders is elected) while the low 32 bits are the per-epoch counter that is increased by leader in each write operation.zxidsmonotonically increase within an epoch
ZooKeeper organizes its data in a hierarchical namespace which is similar to a file system or a tree. The node in this tree is called znode. Each znode is identified by a path, for example a/b/c denotes a path to znode c where b is parent of c, and a is parent of b.
All znode can store data (small - under 1MB) and have associated metadata.
There are 2 types of znode
- Regular: Created and explicitly deleted by client. Can have children.
- Ephemeral:
- Created by client, can be deleted by either client or zookeeper itself after session between client and zookeeper is closed / terminated.
- Can’t have children! So technically, we can say, they are the leaf node in the tree
For example, in above picture, /app1 is regular node, while /app2 is ephemeral node
A client connects to ZooKeeper and initiates a session. Sessions have an associated timeout, if ZooKeeper doesn’t receive any thing from client for more than timeout, the session between client and ZooKeeper would be terminated by ZooKeeper. On the other side, the session could be ended by client too!
Client can send the request (either the normal read/write request, or a heart beat request) to notify its liveness to the server. The ZooKeeper client sends a heart beat after the session has been idle for ms and switch to a new server if it has not heard from a server for ms, where s is the session timeout in milliseconds.
If a client loses its connection to a ZooKeeper server, it can connect to a different server and recover its session, as long as it does so before the session times out.
In order to allow client to receive timely notification about changes, ZooKeeper has the watch mechanism. After client does a read operation such as getData or exists (the API provided by ZooKeeper), when the znode changes, ZooKeeper pushes a one-time notification back over to client. This notification only indicates that a change happened to the znode without providing the change.
Note: Please note about the one-time keyword, so for example, if there’s an update to znode , client would receive a notification, but during the time that notification is on the flight, there are, maybe 3, 4 other updates, client would not receive any new notification, until it issues a new read request again! It’s not really a big deal since the notification is only a signal to notify the client about the fact that the data client reads from node now is stale, and it needs to re-read it again!
- : Creates a
znodewith the specified path name, stores data in it, and returns the name of the newznode. The flags parameter lets a client specify the znode type (regular or ephemeral) and set the sequential flag. - : Deletes the
znodeat the specified path if thatznodeis at the expected version. - : Returns true if the
znodewith the specified path exists, otherwise returns false. The watch flag allows a client to set a watch on theznode. - : Returns the data and metadata (such as version information) associated with the
znode. The watch flag works similarly toexists()except ZooKeeper won't set the watch if theznodedoesn't exist. - : Writes the provided data to the
znodeat the specified path if the version number matches theznode’s current version. - : Returns the set of names of a
znode'schildren. - : Waits for all updates pending at the start of the operation to propagate to the server that the client is connected to. The path parameter is currently ignored.
Notes:
- Each API has both asynchronous and synchronous version
- Each update accepts a version number, which is used for conditional update. If the current version of
znodeis not equal to the version number in the update request, an unexpected version error is returned. We can bypass this check by passing the version number is-1.
- In-memory state: Each ZooKeeper server keeps the entire
znodetree in memory so reads can be served directly from this cache with no disk or consensus overhead. - On-disk durability: Each server persists update in 2 places:
- A transaction log: Every write operation is app
- ended to this log. Because writes must be synced to this log on disk. Each transaction has its own
zxid - Snapshots: The in-memory state is dumped to disk periodically to make crash recovery efficient. Each snapshot also has its own
zxidat the point snapshot is taken.
ZooKeeper doesn’t lock the entire system when taking a snapshot. They do a DFS on the tree, reading each node data and writing them to disk. Because of that, the snapshot includes the state of each znode at various time during the traversal, so it may not correspond to a state that ever existed in memory. Hence, they call this snapshot is fuzzy snapshot
Above is an illustration of snapshot process, so the Fuzzy State is /foo=f1 (stale), /goo=g2 (latest) and it is not the state of the ZK Memory at any point of time!
- We have state after updating
/footof2and/gootog2is/foo=f2, /goo=g2 - The ZK Memory state after updating
/footof3is/foo=f3, /goo=g2
Because the snapshot may contain a mixture of old and new values, the subsequent log replay must bring the system to a coherent state. ZooKeeper achieves this by ensuring that state-changing operations are idempotent and strictly ordered.
The reason why Zookeeper doesn’t lock the tree when taking snapshot is it doesn’t want to affect the read and write performance.
Recovery process:
- Server restart: When a server restarts, it reads the newest snapshot file on disk and loads the
znodetree into memory - Replay transaction logs: The server reads through all transaction logs with greater than the snapshot’s
zxid. This reconstructs the exact state at the end of the logs. - Joining the ensemble: After rebuilding its state, the server contacts the leader to synchronize any updates it may have missed. If it falls behind too far, the leader may drop it and require a full synchronization. One caught up, it participates in the quorum and begins servicing read requests from its in-memory state.
Here’s the flow diagram of read/write operation:
We’ll go into detail in following sections, but there is a note that: responses to client’s requests always include the zxid which indicates the largest zxid seen so far by client. If the client connects to a new server, that new server must ensures that its view of the ZooKeeper data is at least as recent as the view of the client by checking the last zxid of the client against its last zxid. If the client has a more recent view than the server, the server doesn’t reestablish the session with the client un-til the server has caught up. The client is guaranteed to be able to find another server that has a recent view of the system since the client only sees changes that have been replicated to a majority of the ZooKeeper servers.
Local read: Read operation can be done in the replica server, since each of them holds a in-memory which is replicated from the leader ZooKeeper state. This local read is really fast since it’s just in-memory operation on the local server, no disk activity, no agreement protocol.
But this has a big drawback which is the data we read might not be the latest one, even though there are more recent updates to the same znode has been committed. In order to address this problem, ZooKeeper also offers a sync operation. When client uses this API, the follower that receives the request forwards sync to the leader
- If there are pending commits (changes that need to sync to followers but still in pending state), the leader append the
syncafter those pending writes. Hence, the client requests thesyncwill receive the response after all the pending writes are committed. - If nothing is pending, this case is more tricky, as there might be the case that the server which follower thinks that they’re leader is not leader anymore. ZooKeeper does a cool trick by creating a null (no-op) transaction, then placing the
syncafter it. The null transaction still needs to wait for the commit, which requires a quorum ofACK. If the server can commit the null transaction, which means other followers still know it still holds the leadership. If it can’t, it’s not the leader anymore.
Note that: Even though using sync, it’s hard to guarantee that the data returned in the read request after the sync request is the latest one. As new updates can arrive right after the sync response is on-flight to the follower. So the sync only guarantee that: You’ll get the latest data up to the point when you call the sync request.
The write request must be handled by the leader, if the client issues the request to follower, the request is forwarded to leader.
For the write request, ZooKeeper uses the ZAB protocol to coordinate the replication to followers. Under the hood, it uses a variation of two-phase-commit protocol for replicating transaction to followers.
- Prepare phase:
- When the leader receive a change update from a client, it generates a proposal and assigns a
zxidto that proposal. This proposal is sent to all followers. - Follower adds the transaction to its history queue and send
ACKto the leader
- When the leader receive a change update from a client, it generates a proposal and assigns a
- Commit phase:
- When a leader receive
ACK’sfrom a quorum, it send the quorumCOMMITfor that transaction - A follower that accept
COMMITwill commit this transaction, unless the of theCOMMITis higher than any sequence number in its history queue. It will wait for receivingCOMMIT’sfor all its earlier transactions before committing.
- When a leader receive
Here’s the illustration of the success write flow
And here’s the failed flow
One of the most common uses of ZooKeeper is to store and distribute configuration in distributed systems, such as database connection string, feature flags, service endpoints…
For example, we can store the configuration in znode . When process starts up, it read the full pathname of z_c, reading the data from z_c with the watch flag set to true. So, in case there’s any new / update configuration in z_c, client would receive a notification to re-read the znode again.
Note that:
- We should store the dynamic configuration - things that can be changed at runtime, for static configuration which only changes during deployments had better to be kept in files or environment variables
- From the pattern, you might see that we may encounter the thundering herd problem, as a hot node is updated, notifications are pushed to all clients interested in that node, and all those clients send the read request to ZooKeeper to fetch the latest data, we’ll go into that
One of the tricky parts of building distributed systems is that you don’t always know what the final system configuration will look like ahead of time. Imagine a scenario where a client wants to spin up a leader process along with several worker processes. The actual scheduling of those processes is handled by a cluster scheduler, which means the client can’t predict critical details like the IP addresses or ports the leader will use. And without those, how are the workers supposed to connect to the leader?
ZooKeeper solves the problem using what’s called a rendezvous znode. Here’s how it works:
- The client first creates a
znode(say /rendezvous/leader) before launching any processes. - The full path of this
znodeis passed as a startup parameter to both the leader and the workers. - When the leader process comes online, it writes its connection information (addresses and ports) into the rendezvous
znode. - The workers, meanwhile, try to read this
znodewith a watch. If the data isn’t there yet, they simply wait. As soon as the leader fills in its details, ZooKeeper notifies the workers, and they can connect immediately.
And there’s a nice cleanup story too: if the rendezvous node is created as an ephemeral znode, then when the client ends, ZooKeeper automatically deletes the node. Both leader and workers see this deletion event and know it’s time to shut down gracefully.
One of ZooKeeper’s most elegant features is the ephemeral znode - a type of node that automatically disappears when the client session that created it ends. This seemingly small capability turns out to be incredibly useful for representing liveness in a distributed system.
Take the example of a recommendation engine that runs as multiple independent instances. Clients need to send requests to any currently alive instance. The challenge is that these instances can come and go at any time - new ones might be added for scaling, and old ones might crash or be shut down. How can clients always discover which instances are available?
With ZooKeeper, the solution is straightforward:
- Create a parent
znode, say/recommendation-engine - Each time a new engine instance starts, it registers itself by creating an ephemeral child node under this path (e.g.,
/recommendation-engine/instance-1). - Clients that want to use the service simply query the children of
/recommendation-engineand pick one of them to connect to. - If an instance crashes or its session with ZooKeeper ends, its ephemeral node is automatically deleted. Clients observing the parent
znodewill be notified and can adjust accordingly
Unlike Chubby, ZooKeeper is not designed to be a lock service, however, it could be utilized to implement locks.
The most simple way to implement lock using ZooKeeper is leveraging the “lock file”, each look is represented as a ephemeral znode. When a client wants to acquire a lock, it can try creating a ephemeral $znode . If the creation is success, client holds the lock, otherwise, it can read the lock with watch flag, so it will be notified when the lock is released (either the client hold the lock dies or that client releases the lock explicitly). However, it’s trivial that this would suffer from the herd effect, in case there are many clients waiting for the lock, a lock release notification can trigger many create request to the server, while at the end, only one client win, the others still need to wait for the next round.
In order to resolve the herd effect, ZooKeeper line up all clients want to acquire for lock in order of the request. It does so by creating ephemeral znode with SEQUENTIAL flags. So for example, we have 2 clients want to hold the lock, we create 3 ephemeral znodes, says locks/mylock-1 , locks/mylock-2, locks/mylock-3 The trick here is that:
- If the
znodename is the smallest under thelocksis the smallest → it successfully acquires the lock. - Otherwise, it watches only its immediate predecessor’s
znode. So in there the client createdlocks/mylock-3only care about the change oflocks/mylock-2. The change here should be thedeletedchange - when the lock is released!
Here’s the flow:
Here’s an example of how multiple clients try to acquire the same lock
The advantage of this approach is that because each node is only watched by exactly one other client, its removal only causes one client to wake up → we don’t have herd effect
The simple lock & simple lock without herd effect we’ve just gone through is exclusive lock!
The read/write lock implementation requires a slightly modification of simple lock without herd, as the read can be shared, and only be blocked by write lock!
In distributed computing, a barrier is a way to make sure that:
- No process starts early until everyone is ready
- No process leaves early until everyone has finished.
The “double” part means there are two synchronization points:
- Entry barrier: all processes must wait until a minimum number (the threshold) have joined
- Exit barrier: all processes wait until everyone has finished before leaving.
We can leverage ZooKeeper to achieve that by:
- creating a parent node (say
/barrier) represents the barrier. - Each process enters by creating a child under
/barrier(e.g/barrier/p1) - When the number of children ≥ threshold, one process creates a
readyflag node. All processes are watching for thereadynode, so when it appears they know it’s safe to begin. - Processes set watches to observe when other children are removed. Once all children are gone, the exit condition is met, and every process knows it can safely move to the next stage of the workflow.
Here’s the sequence diagram for Entry Barrier
And here’s for the Exit Barrier
ZooKeeper is built to be coordination kernel for other systems, but every distributed system has its own coordination and ZooKeeper is not an exception.
Underneath, ZooKeeper has some sets of servers, which coordinate each other. ZooKeeper ensures that all servers see the same sequence of updates. To do that, it relies on an atomic protocol - ZooKeeper Atomic Broadcast (ZAB).
A broadcast algorithm transmits a message from one process (the primary) to all other processes in the network including itself. Atomic broadcast protocols are distributed algorithms guaranteed either to correctly broadcast or to abort without any side effects.
- Replication guarantees:
- Reliable delivery: If one server delivers a message, every server will eventually deliver it
- Total order: If message a is delivered before message b on one server, that ordering holds on every server
- Causal order: If a server sends a message after delivery another message, those messages are ordered accordingly.
- Transactions are replicated as long as majority (quorum) of nodes are up
- When nodes fails and later restarted - it should catch up the transactions that were replicated during the time it was down.
ZooKeeper’s messaging system operates in two phases:
- Leader activation: when a new leader is elected, it synchronizes with followers by sending them any missing proposals (or a full snapshot) and then proposes a special
NEW_LEADERproposal. Followers acknowledge this only after they have caught up. The leader won’t accept new proposals until a quorum has acknowledged this step . - Active messaging: once the leader is active, it processes client requests by proposing them to followers. Proposals are sent in the order requests are received; followers write them to disk and acknowledge in order, and the leader commits a proposal as soon as a quorum has acknowledged it . This resembles a streamlined two‑phase commit.
Because only one leader is active at a time and proposals are numbered uniquely, a new leader can always tell which proposals were committed and resume safely .
Every vote in ZooKeeper requires a quorum of servers. By default, ZooKeeper uses a simple majority - more than half of the servers must agree on a proposal or leader. The key property is that any two quorums must overlap, this ensures that a committed proposal always remains visible to future quorums.
Even though the landscape has been significantly changed since the time ZooKeeper was introduced in 2008, it still plays a crucial roles in many major distributed system such as HBase, Hadoop, SolrCloud, Storm,…
Kafka used to use ZooKeeper as well for coordination tasks, but they have moved to the Kafka Raft Metadata Mode (KRaft).
Think of ZooKeeper as a coordination backbone for distributed apps: a file-system-style API on top, with powerful building blocks underneath for config management, service discovery, leader election, and locks. Even with etcd, Consul, and cloud services around, the design patterns ZooKeeper established are still core to modern systems.
https://www.usenix.org/legacy/event/usenix10/tech/full_papers/Hunt.pdf
https://www.youtube.com/watch?v=3S51y5OwbRQ
https://www.hellointerview.com/learn/system-design/deep-dives/zookeeper#read-and-write-operations
http://www.tcs.hut.fi/Studies/T-79.5001/reports/2012-deSouzaMedeiros.pdf