Percolator: Large-scale Incremental Processing Using Distributed Transactions and Notifications

An engineer’s walkthrough of Google Percolator: from MapReduce to incremental processing, exploring transactions, lazy lock cleanup, observers, and performance optimizations on Bigtable.

Google originally built its web index using MapReduce batch pipelines. Each run crawled the entire web and processed all pages together to enforce invariants such as deduplication and link inversion. For example, when multiple URLs contain the same content, Google selects one canonical URL (usually the one with the highest PageRank). Only this canonical page is treated as the main record in the index, and anchor text from links to any duplicate is internally forwarded and attached to it.

This design creates heavy contention: many crawled pages may map to the same canonical URL, and both the canonical record and related metadata (such as forwarding links) must be updated atomically to preserve consistency.

MapReduce avoids this problem by rebuilding the entire dataset at each stage. Because every step processes all pages together, there is no concurrent contention on shared records.

However, this approach has a major drawback: even a small change like a single new or updated page - requires rerunning the whole pipeline. The entire index must be rebuilt, meaning unchanged pages are reprocessed unnecessarily, leading to high latency and wasted computation.

The indexing system could store the data in a DBMS and update individual documents using transactions to maintain invariants. However, existing DBMS can’t handle the sheer volume of data: Google’s indexing system stores tens of PB across thousands of machines. Google’s BigTable can scale to that size, but they don’t provide distributed transaction capability.

With that requirement, Google decided to build Percolator. Percolator isn’t a standalone storage system; it’s implemented as a client library that runs on top of BigTable.

II. Components — figure 1

Big Table: a ****sparse, distributed, persistent tables, each of which is a sorted key-value map.

Each row has multiple columns grouped into column families. The intersection of row x column has multiple cell. Each cell is a versions of the data for that row x column. Technically, we can think of BigTable as a map of: (row,col,timestamp)(row, col, timestamp)valuevalue

Rows are range-partitioned into tablets, which are sharded and distributed across machines. One key point about BigTable is that it guarantees atomic operations at the single-row level.

Google File System (GFS): GFS manages large immutable data blocks (“chunks”), replicates them across multiple machines, and handles failures. Bigtable writes structured data into these GFS chunks and maintains metadata so tablets can be reassigned or split without moving application logic.

Percolator uses Bigtable to store all indexing data, then coordinates multi-row updates through its own transaction protocol to support distributed transactions and incremental processing.

Chubby is a lock service developed by Google, it’s mainly used for primary election, server discovery, work partition. Percolator uses Chubby to check for the liveness of the worker (we’ll explain why do we need this in the Locking session).

A Percolator worker is a long-running process on each machine. Each worker contains:

  • Logic to scan Bigtable for changes (which is used to trigger the observer)
  • A set of observers (application logic) that execute when relevant data changes. Any writes performed by observers go back to BigTable, potentially creating new notifications that any worker can pick up next.

Here’s simplified flow

3. Percolator Worker — figure 1

A logical column in BigTable can have a few more columns which Percolator added to serve for its functionalities:

Let’s say for a logical column c, there are few more columns related to c

ColumnUse
c:lockAn uncommitted transaction is writing this cell; contains the location of primary lock
c:writeCommitted data present, stores the BigTable timestamp of data
c:dataStores the data
c:notifyObservers may need to run
c:ack_OObserver O has run, stores start timestamp of successful last run

Here’s the simplified visualization of how Percolator stores data using BigTable. In this example, we have c is the link column.

III. Data model — figure 2

From now on, when we write c:lock, we mean the lock column associated with logical column c (and the same applies to c:data, c:write, etc.).

Recall that in Bigtable, a cell is identified by (row, column, timestamp), and each (row, column) pair can have multiple versions over time (aka multiple cells). So when we say a c:lock cell, we’re referring to one specific version (at a particular timestamp), not the entire (row, column) and all its historical versions.

Percolator relies on a centralized timestamp oracle to provide strictly increasing timestamps. Every Percolator worker must contact this service to obtain timestamps when executing transactions (we’ll dive into the protocol details in the next section). Although centralized, the oracle scales well thanks to two key optimizations.

  • First, the oracle allocates timestamps in large ranges and persists only the highest allocated value to stable storage. Once a range is reserved, subsequent requests are served entirely from memory, which is extremely fast. If the oracle crashes, it simply reloads the last persisted value and resumes from a higher number. Timestamps may jump forward, but they never go backward.
  • Second, each Percolator worker batches timestamp requests. Instead of issuing one RPC per transaction, a worker maintains a single pending request to the oracle and shares the returned timestamps across many local transactions. As load increases, batching naturally increases as well, improving throughput at the cost of slightly higher latency.

When building the web index, Google needs to update multiple rows atomically while preserving invariants, resulting the demanding of distributed transactions. As BigTable only provides single-row atomicity, so Percolator adds distributed transactions to ensure multi-row changes either commit together or not at all.

BigTable maintains multiple versions of data in each cell, it supports MVCC. Percolator provides cross-row, cross-table transactions with ACID snapshot-isolation semantics.

Snapshot isolation level is a transaction isolation level in which each transaction reads data from a snapshot of the (committed) data as of the time the transaction started. It’s very similar to Repeatable Read but they’re not exactly the same. Their difference is that: Snapshot isolation allows write skew but block phantoms, whereas Repeatable Read blocks write skew but allow phantoms.

Percolator is built as a client library accessing BigTable rather than controlling access to storage itself. Any node in Percolator can issue requests to directly modify state in BigTable. Hence, Percolator must handle locking.

The lock service must provide high throughput, low-latency, distributed and failure tolerant. BigTable satisfies all those requirements, so Percolator stores its locks in columns in the same BigTable that stores data and reads / modifies the locks in a BigTable row transaction when accessing data in that row.

Let’s say this request starts at starttsstart_{ts} (obtains from Oracle timestamp)

  • Firstly, it checks for the c:lock cells of all columns it wants to read, if there’s any c:lock cell has the lock which is within [0,startts][0, start_{ts}], then it waits.
  • If the above step is passed, get value of the tuple (row, c:write, timestamp) where timestamp ≤ startts)start_{ts}), this value is a timestamp also, let’s say it’s TxT_x, then query the value of cell (row, c:data, TxT_x) and return the value.
3. Read request — figure 2

For write request: Percolator uses 2-phase commit:

  • prewrite: In this phase, there are a few steps:
    1. Obtains start timestamp from the timestamp Oracle: starttsstart_{ts}

    2. Check of existence of

      1. Any lock on the column we want to update. (by checking c:lock cells)
      2. Any write with greater timestamp than the transaction timestamp (by checking c:write cells)

      If any of above met, it means there’s a write conflict, then the transaction is aborted. Note: it’s possible that the previous transaction has committed but it has not released the lock yet (due to slow network / client is paused by GC,…), but Percolator treat it as conflict and abort the transaction anyway.

    3. Writes new value to the each c:data cell (row, c:data, starttsstart_{ts}) → new_value. Write a new c:lock cell (row, c:lock, starttsstart_{ts}) → (primary.row, primary.col) (We’re going to talk about the primary soon) Please note that: in this step, even if we write the new data to c:data, the data is not visible to other transactions / Get request yet, because the c:write with the starttsstart_{ts} is not created yet.

  • Commit: Delete all locks & update write records. This process starts by erasing the primary cell’s c:lock and creating c:write cell, followed by erasing lock c:lock+ creating c:write cells. The transaction is not considered committed until the primary lock is converted into a c:write cell.

Following is simplified sequence diagram to visualize the logic.

4. Write request — figure 3

Now comes an important problem: after the prewrite phase succeeds, a transaction still needs to delete multiple locks and add multiple write records during commit. Only the primary cell is committed atomically; all other locks are released one by one. If the client crashes in the middle of this process, some locks may be left behind, blocking future transactions.

To handle this, Percolator uses a lazy cleanup strategy.

Instead of immediately trying to recover failed transactions, Percolator lets other transactions clean up abandoned locks when they encounter them.

Suppose transaction B tries to acquire a lock on cell X but finds an existing lock created by transaction A. How does B know whether A is still running or has already failed?

Percolator solves this by designating one of A’s modified cell as primary cell. The lock on this cell is called the primary lock. Every lock written by A stores the location of this primary cell inside its c:lock value.

So when B encounters a lock on X, it first looks up A’s primary cell (using the pointer stored in X’s lock). From there, two situations are possible:

  • Primary lock is gone, and a write record exists: This means transaction A already committed. Transaction B then rolls the transaction forward by removing the stale lock on X and writing the missing c:write record essentially finishing A’s leftover work.

  • Primary lock still exists: In this case, A may still be running or it may have crashed and left the lock behind.

    To distinguish between these, Percolator uses Chubby as a liveness oracle. Active workers register themselves in Chubby, allowing other workers to check whether the original client is still alive.

    However, a worker might be alive but stuck. To handle this, Percolator also stores a wall-time timestamp inside each lock. Workers periodically refresh this timestamp while committing. If the wall time becomes too old, the lock is treated as abandoned and cleaned up even if the worker still appears alive.

    This lazy recovery approach avoids centralized cleanup and allows any transaction to repair failed state on demand, keeping Percolator scalable and fault tolerant.

5. Lazy Lock Cleanup and Primary Locks — figure 4

Percolator uses observers to drive incremental computation. An observer is user-defined logic that runs whenever specific Bigtable columns change. Each observer executes in its own transaction, and applications are typically structured as pipelines: one observer processes data, writes results back to Bigtable, and those writes trigger downstream observers.

Although Percolator technically allows multiple observers to watch the same column, Google avoids this in practice and usually assigns one observer per column. This keeps execution deterministic and easier to reason about.

Percolator also supports message collapsing: for a given column change, at most one observer transaction will commit. If many writes occur in quick succession, they may be coalesced into a single observer run, avoiding redundant work.

To support observers, Percolator introduces a few additional columns:

  • c:notify: a lightweight signal indicating that an observed column was updated.
  • c:ack_O: the acknowledgment column for observer O, storing the start timestamp of the observer’s last successful run.

There is one c:ack_O column per observer. In practice, the number of observers is small. For example, Google’s indexing system uses roughly 10 observers.

Conceptually:

  • Each observed column has a corresponding acknowledgment column.
  • Any write to an observed column also sets c:notify.
  • c:ack_O acts as a watermark telling Percolator how far observer O has progressed.

Percolator workers continuously scan the c:notify column (stored in a separate locality group for efficiency). When a worker finds a notify entry, it:

  1. Reads the latest committed write timestamp from c:write.
  2. Reads the observer’s acknowledgment timestamp from c:ack_O.
  3. Compares the two:
    • If c:write > c:ack_O, the observer is invoked.
    • Otherwise, the change has already been processed and is skipped.

Percolator guarantees that at most one observer transaction commits per notification. This is enforced by transactional conflicts on the c:ack_O column: if multiple workers race, only one can successfully update the acknowledgment.

Workers also coordinate using a lightweight lock service to avoid duplicate work.

Workers discover work by scanning c:notify. However, this can lead to scanner clumping: multiple workers may converge on the same hot row while earlier rows contain no notifications.

For example: Worker 1, 2, 3 starts scanning rows at different points.

4. Avoiding Scanner Clumping — figure 5

But then, the worker 3 reaches the row that has notify columns, while all the rows before it don’t have notify columns, hence, worker 1 and worker 2 quickly catches up with worker 3.

4. Avoiding Scanner Clumping — figure 3

This reduces parallelism and overloading tablet servers. To prevent this, Percolator uses a simple strategy: if a worker detects that another worker is already processing the same row, it immediately jumps to a random location in the table and resumes scanning there. For example, in the above case, worker 1 & 2 would teleport to a random location and continue scanning from there.

Percolator issues many RPC calls to BigTable, most of them come from transactions. Writing a lock normally requires 2 RPCs calls: one for checking for conflict, then another one to write the lock. To reduce this, Google extended BigTable with conditional mutations, allowing both steps to be performed in a single RPC.

Another optimization is utilizing batching.

  • Mutations for the same tablet server are grouped into one RPC. To enable this, Percolator tradeoff the latency by delaying the lock requests for a few seconds to collect them. This increases the latency, but increase the overall throughput.
  • For reads: Batching reads was considered, but it increases the latency. The latency tradeoff for read is not entirely acceptable, so they went to prefetching. It relies on BigTable property: reading multiple columns from the same row costs almost the same as reading one, as BigTable must fetch and decompress the entire SSTable block anyway. Percolator predicts which column a transaction is likely to read and fetches them eagerly.

Percolator adopts thread-per-request model: All API calls are blocking, and each machine runs thousands of threads to achieve parallelism. This simplifies the application code compared to event-driven models. The downside is extremely high thread counts which is resolved by internal kernel improvements.

https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Peng.pdf

https://jaymcor.github.io/notes/isolation_rr_si.html#:~:text=A careful review of the,Write Skew Test Setup

https://jepsen.io/consistency/phenomena

Tagged:#Backend#Distributed System#Paper Notes
0