Wormhole: Reliable Pub-Sub to support Geo-replicated Internet Services
Explore how Facebook’s Wormhole powers real-time data propagation at massive scale. This post breaks down its core design - flows, caravans, data-markers, and multi-datacenter failover, and explains how it achieves trillions of reliable, in-order updates daily with millisecond latency.
Facebook is a social networking service that connects people all over the world. They have plenty of services which are interested many other portion of data written by others. For example, when a user posts content to Facebook, it’s written in database. Then NewFeed service needs to learn that change and update the news feed of related users, or the Notification service needs to know about that change to notify users follows that content / post owner,… Other internal services such as cache invalidation pipeline, index server pipeline,.. are also interested in the write.
A naive approach is that those services periodically poll the change from database to learn for new updates. But it leads to the dilemma about the interval time. Long polling interval can lead to longer stale data, while shorter one will increase the workload of the storage system.
An efficient way to resolve this is to use publish - subscribe systems which identify the changes and transmit the updates to interested application.
Most existing pub-sub systems require a custom data store which interferes on writes to generate the notification for interested applications. Given the fact that at Facebook, they have different storage systems (MySQL, HDFS, RocksDB), interposing on write to these storages requires modification across the the software stack. Moreover, writing the updates to a custom data store also raises another challenge of maintaining that store.
So meta engineers decided to create a pub-sub system called WormHole.
In simple terms, Wormhole takes updates written to a source data store and publishes them as streams of messages that other systems can subscribe to.
Some keys:
- Reads transaction logs of storage systems, encodes updates in form of key-value pairs and wrap them up in
Wormhole update→ interested applications don’t need to know how data is stored in underlying storage system. - Read the transaction logs from nearest replica of subscribers → reduce the latency
- Resilience:
- Publisher failure: Subscriber can choose a set of publisher to receive the update (one primary - multiple secondary). if primary fails → receive the update from secondary.
- Subscriber failure: Publishers periodically store the transaction log position received and ack by the subscriber (the data-marker), when the subscriber is up, can use that position to send the updates.
- Delivers updates reliably - at least one, in-order
- Datastore (MySQL, RocksDB, HDFS,..): Stores collection of shards, each maintaining its own transaction log.
- Shards: each shard contains partitioned dataset
- Dataset: Collection of related data
- Shards: each shard contains partitioned dataset
- Producers (Publishers): Colocated with the datastore**,** continuously tails the logs and emits ordered Wormhole update flow.
- Note: At the time the paper was written, publisher is colocated with database, they are not allowed to use many cores, which limits the publisher’s parallelism.
- Wormhole update: changes represented in key-value format, and a special
.#Sto indicate the shard - Flow: Sequence of updates. Note that, updates are totally ordered within a shard, but not across different shards.
- Subscribers:
- Subscriber = Interested application = A running instance of a service that needs the data.
- Receive update via flow
- Data-marker: a pointer (offset) in the datastore’s transaction log indicating the last update that was delivered and acknowledged for a flow, stored in ZooKeeper
- One shard is charged by exactly one publisher
- Each pair has a separate flow:
- Which means, if 2 different subscribers both want updates from Shard X, the publishers maintain 2 flows (one per subscriber for Shard X).
- Publisher:
- Can publish updates for multiple shards.
- Tracks data-marker per flow after the subscribers acknowledged they processed the update by periodically asks the subscribers.
- Find subscribers via ZooKeeper-based configuration system.
- Subscriber
- Can subscribe to updates from multiple shards.
- Can’t choose the shard it subscribes to, it’s decided by publishers.
- When a subscriber recovers after its failure, the publisher uses the data-marker to restart sending updates.
- Wormhole allows the application to inform publishers of what filter it needs, the publisher only delivers updates that pass the supplied filters.
How Wormhole balance flows among the subscribers
- Weighted random selection: lightly loaded subscribers get more flows.
- Subscribers use ZooKeeper to balance load themselves: If some subscribers get heavily loaded, the redirect some flows to lightly loaded subscribers.
Updates are sent over the TPC connection, publishers multiplex TCP connections of all flows associated with the same subscriber.
The transaction logs are stored in files, each read to transaction log involves I/O operation. Hence, these following cases can result in significant I/O-load:
- In steady state, all flows get updates from the same position (the same transaction log).
- When many subscribers recover simultaneously, the need to re-read the logs starting from their data-markers.
In order to reduce the I/O load, Meta engineers introduced Caravan - instead of multiple flows read the transaction log separately, we have a single reader which reads the transaction log in-order and returns to all interested flow (aka a cluster of flow).
Each caravan has two conceptual positions:
- Read pointer (let’s call it ): How far the caravan has actually read from the database transaction.
- Cursor (commit pointer - let’s call it ): The smallest acknowledged offset across all flows in the caravan - this is the “safe” position that the caravan could restart from after a crash.
That comes to:
- Caravan must hold all the logs between the and in a buffer so it can serve to all flows (from the slowest → fastest).
- .
- How far can be ahead of highly depends on the buffer limit.
But this introduces a problem, as some subscribers might become slow in processing the update, that causes the Caravan holds too much data in buffer / reaches the buffer limit.
Therefore, Wormhole creates additional caravan to follow the lead caravan, these caravans send past updates to slow subscribers.
Caravans are periodically split and merged based on the data markers of the flows they’re in charge of:
- Split: If a caravan has member flows that actually form two distinct groups (e.g. some are only slightly behind the lead, and others are far behind), Wormhole will split it into two caravans so each can read at an optimal point . This prevents a moderately-lagging flow from being forced to wait for a very slow flow; each new caravan handles a tighter range of positions.
- Merge: Conversely, if two caravans are reading near the same log position (i.e. their flows’ data-markers are “close” to each other), they can be merged back into one . Merging reduces duplicate reading when it’s no longer necessary to have separate streams.
The balance between I/O-load and latency might be tricky:
- If we allow a large number of caravan → more reads → more I/O-load, but we can have a better clusters of flows, as flows with nearby data-markers on the same caravan.
- If we allow very few caravans → flows with very different data-markers get assigned to the same caravan, making flows which are farther ahead wait for flow that are very far behind.
- The wait here refers to the fact that, as the can’t be ahead too far from , so if the distance is too far:
- Buffer start filling with unacknowledged data (since the slow subscribers didn’t acknowledge → can’t advance.
- The Caravan slows it reads to avoid overflowing buffers or exceeding configured read rate caps.
- The wait here refers to the fact that, as the can’t be ahead too far from , so if the distance is too far:
Flows are also dynamically assigned to a varying number of caravan to optimize for latency and I/O-load when:
- It’s not able to keep up with the speed of the caravan.
- Its data-marker is far ahead compared to others on the same caravan.
- ⚠️ Note that when the flow is assigned to another caravan, Wormhole must ensure that the Caravan’s cursor ≤ the flow’s data-marker.
Bring them on together, we can have this diagram that shows the architecture
Wormhole supports 2 types of deliveries:
- single-copy reliable delivery (SCRD)
- multiple-copy reliable delivery (MCRD)
Subscribers subscribe to one dataset (one shard), they receive at least one all updates of that shard.
Periodically, publisher sends the data-marker (current position in the transaction log). the subscriber acknowledges a data-marker once it has processed all updates prior to the data-marker, then the data-marker is stored on the publisher side in persistent storage
This data-marker is used when a subscriber recovers after failure, publisher will restart sending updates from the data-marker. This might cause the updates to be sent twice or more (in case the subscriber already consumed the update but did not received the data-marker ack request from publisher). it’s the application’s job to remove the duplication, or maybe they don’t need at all, if the updates are things like cache-invalidation.
In multiple-copy reliable delivery, Wormhole allows the subscriber to receive the updates from the multiple copies of the same dataset, which is the case of replication.
It’s useful when the publisher doing the single-copy reliable delivery fails permanently, e.g because of hardware failure, we would like a publisher running on replicated datastore to take over and do the single-copy reliable delivery from the point the failed datastore stopped sending updates.
In other word, the MCRD is SCRD with failover.
⚠️ Note that, even though subscribers subscribe to multiple copies of the same dataset (the shard replicas), at a time, only one publisher actively publishes the updates!
But there are several challenges:
- The data marker is stored in persistent storage by the publisher, which is typically co-located with the database host, so when the host fails, we lose the data marker as well.
- The data-marker for a flow is a pointer to the logs of the datastore. It’s usually a filename of the log and byte offset within that file. Unfortunately, data-markers represented this way are specific to the particular replica and not straightforward to find the corresponding position in a different replica. For example, in MySQL, binary log names and offsets are completely different for different replicas.
- Publishers typically don’t communicate with each others, so it’s hard to know wether one fails so one of others must take the lead.
Logical position
To overcome (1) and (2), Wormhole introduces logical positions. Same update in different replicas have the same logical position. A logical position is identified combination of:
- Sequence number: monotonically non-decreasing, it is typically associated with the storage. For example:
- In MySQL, it’s the Global Transaction ID (GTID)
- In RocksDB or TAO’s, it’s a monotonically increasing sequence number assigned when a write is committed.
- For other systems, such as HDFS, they use the timestamp as the sequence number.
- Index:
As single transaction may produce multiple key updates within the same commit: Such as:
UPDATE posts SET like_count = like_count + 1 WHERE author_id = 123;UPDATE posts SET like_count = like_count + 1 WHERE author_id = 123;May produce
(post:101, like_count=7)
(post:102, like_count=13)
(post:103, like_count=99)(post:101, like_count=7)
(post:102, like_count=13)
(post:103, like_count=99)The datastore assigns a single sequence number of this transaction, e.g
If Wormhole only uses 123 as the logical position for all these updates, subscribers can’t tell the precise order within that transaction. So we have the index which is a sub-counter identifying each individual update within that sequence number.
So in conclusion, the logical position is
(logical_position) ⇄ (datastore log file name, byte offset)(logical_position) ⇄ (datastore log file name, byte offset)This local position can be built directly from the transaction log.
We also have a data structure called logical positions mapper to map logical positions from data-markers to datastore-specific positions. This mapper is stored directly in the publisher.
Publisher coordination
To resolve (3), Wormhole leverages ZooKeeper ephemeral node. Each publisher has its own ephemeral node, and when a node fails, its ephemeral node is deleted, hence, others are watching the ephemeral nodes get notified, and one of them take the lead. Meta did not reveal how do they choose the replacement. But I think that it’s just a leader election process done by ZooKeeper.
In production, Wormhole delivers trillions of updates per day across Facebook’s infrastructure, powering real-time features like News Feed and Notifications.
It guarantees at-least-once, in-order delivery per shard while maintaining sub-100 ms end-to-end latency within a datacenter.
Caravans drastically reduce datastore read amplification, on average only 1 – 2 caravans per publisher are active, balancing I/O efficiency and latency.
Under failure, multi-copy reliable delivery (MCRD) enables seamless publisher failover using ZooKeeper and logical positions, ensuring uninterrupted streaming even during datacenter outages.
Compared with earlier polling-based systems, Wormhole achieved roughly 40 % lower CPU load, 60 % lower I/O, and improved cross-region propagation from minutes to milliseconds.
https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-sharma.pdf