TDSQL: Tencent Distributed Database System
How Tencent built a MySQL-compatible distributed database serving 30,000 enterprises with physical replication, Raft-based consistency, lock-free internals, and horizontal scaling explained.
Tencent Distributed SQL (TDSQL) is a database system developed by Tencent Cloud, designed to address the performance requirements of large-scale application. TDSQL is built upon TXSQL, an open-source MySQL branch maintained by Tencent, which is fully compatible with MySQL’s syntax and APIs.
It’s officially launched on Tencent Cloud, at the time of the paper is written, by 2023, it was adopted by 30,000 enterprises in various industries, 7 out of top 10 banks in China already adopted it. Their TPC-C benchmark test recorded the capability of TDSQL in efficiently processing data exceeding the 10 PB threshold while maintaining scalability on a single cluster equipped with over 100,000 physical cores.
In this blog, we’ll explore their design decision, what they did to optimize the TDSQL.
Here’s the overview of the system:
- Storage module: Consist of multiple set of machines. Each set consist of multiple machine (either physical / virtual) that serves as Data node and has some Agents running along. Normally, a Set has one primary data node and 2 secondary data nodes. The Agent’s job is to monitor the health of Data nodes, report heartbeat to ZooKeeper in Management module.
- Management module: Includes:
- the Scheduler cluster to execute planned cluster-management tasks (such as add more storage nodes, switch traffic to another replica,…)
- ZooKeeper to coordinate scheduler tasks status / data and receives the report from Agents in Storage module
- Compute Module: The SQL-facing layer of TDSQL. It receives client connections, check permissions, parses SQL, decides which shard should handle the request, and coordinates distributed execution when multiple shards are involved.
High availability means a system can continue serving requests even when some machines or components fail. In a database system, this usually means the database should remain available when a primary node crashes, a disk fails, or a network issue happens.
A common way to achieve high availability is replication. Instead of storing data on only one node, the system keeps multiple copies of the data.
Logical Replication vs Physical Replication:
- Logical Replication records high-level change events, indicating what change was made. The replica reads the change and applies the same change locally. The main point is that the physical location of data on the replica does not need to match the primary. They only need to agree on the logical change.
- Physical replication records storage-level changes rather than SQL-level changes. A physical log record usually points to the storage unit being modified, such as a tablespace and page number, and describes the redo operation that should be applied to that page. Therefore, the replica does not need to re-execute SQL or locate the row by itself. It simply replays the same physical log stream from the primary. In this sense, physical replication is closer to crash recovery than normal SQL replication: given the same base data files and the same ordered log records, the replica should reach the same physical state as the primary.
MySQL uses logical replication using the binlog file, the file is maintained per storage node which introduces following problems:
fsynccalls: In MySQL’s binlog-based replication, each storage node maintains two durability paths: the InnoDB redo log for crash recovery and the MySQL binlog for replication. These two logs must be coordinated carefully during commit; otherwise, the primary and replicas may observe different transaction histories after a crash. As a result, a transaction may require two durable flushes: one for the redo log and one for the binlog. Since each flush usually involves an expensivefsync, this adds latency to the commit path, especially in write-heavy workloadsfsyncis expensive because it forces the database to wait until the log is physically persisted to storage. A normalwrite()may only place data in the OS page cache, butfsync()creates a durability barrier and waits for the SSD/disk to confirm persistence. Therefore, if a distributed transaction requires two durable state transitions, prepare and commit, it may need twofsynccalls, which adds significant latency to the commit path.- binlog contention: Each data node only has one binlog file, so there’s likely contention when multiple large transactions want to write to that file at the same time.
TDSQL developed a solution for physical replication, they utilize the storage-engine log records instead of using binlog. This offers some key benefits:
- Transaction commits now require only one commit log persistence (one
fsyncfor redo log) - Data updates can be synchronized while transaction execution is in progress. Binlog-based logical replication usually ships a transaction after it reaches the commit stage, because the binlog represents the committed transaction history. Physical replication can ship storage-engine log records earlier, while the transaction is still executing. The secondary may start receiving or replaying those logs before the final commit decision, but the transaction only becomes visible after the commit record is replicated. If the transaction rolls back, rollback information is also replayed, so the secondary does not expose the aborted changes.
TDSQL improve high availability by keeping multiple replicas for each data set. There are 3 replica types in TDSQL:
- Primary replica: the one that has newest data, it serves read / write request.
- Secondary replica: stores the copy of the data and can become the new primary if the current primary fails
- Log replica: Only stores the copy of the transaction logs, it does not store any data and can’t be promoted to primary.
Let’s take a look at each type:
Primary replica: When processing a transaction, the primary replica handles log generation, local persistence, and replication to secondary/log replicas.
- The user session generates transaction logs and appends them to the Log Buffer, which is an in-memory buffer.
- A Log Writer thread reads logs from the Log Buffer and writes them to the local Log File. At the same time, it copies these log records to the Copy Buffer, an in-memory circular queue used by replication threads.
- A Log Flusher thread periodically calls
fsyncon the Log File to make the written logs durable on disk. This is needed becausewrite()may only place data into the OS page cache, the data is not guaranteed to survive a crash until it is flushed to stable storage. - For each secondary or log replica, the primary creates a dedicated Log Dump thread. This thread reads recent logs from the Copy Buffer and sends them to its associated replica. If the required logs have already been overwritten in the Copy Buffer, the Log Dump thread falls back to reading them from the durable Log File.
- When a transaction reaches commit, the primary does not immediately return
OKto the client. Instead, the transaction enters the Commit Queue and waits until its commit log has been replicated. - After a secondary/log replica stores the received logs, it sends an ACK back to the primary. The ACK contains the latest replicated Log Sequence Number (LSN). A Listener thread on the primary receives this ACK, checks the Commit Queue, and releases every transaction whose
commit_lsn <= ack_lsn. For those transactions, the primary can safely returnOKto the corresponding client connection.
Secondary replica:
- The secondary receives logs from the primary’s Log Dump thread. It writes these logs into its local Log File.
- After storing the logs, the secondary sends an ACK back to the primary. The ACK contains the latest stored LSN. This ACK is received by the primary’s Listener thread and used to release transactions from the commit queue.
- The secondary then wakes up its Log Apply pipeline. This pipeline consists of a Log Coordinator and multiple worker threads.
- The Log Coordinator reads logs from the secondary’s local Log File in batches. The paper says the default batch size is 32 MB. It parses the logs and distributes them to worker threads based on the hash of
space_idandpage_no. Here,space_ididentifies the tablespace, andpage_noidentifies the page within that tablespace. This ensures that logs for the same data page are handled by the same worker, preserving page-level order. - Each worker thread applies its assigned logs to pages in the buffer pool. These modified pages become dirty pages and are eventually flushed to the data files.
A buffer pool is the database’s main in-memory cache for table and index pages. Instead of reading every page from disk, InnoDB keeps frequently accessed pages in memory, which makes reads much faster. When data is updated, the change is usually applied to the page in the buffer pool first; that page becomes a dirty page and is flushed back to the data file later. The redo log protects these dirty pages: if the database crashes before they are written to disk, the redo log can be replayed to recover the committed changes. In TDSQL, both primary and secondary replicas have buffer pools: the primary updates its buffer pool through normal SQL execution, while the secondary updates its buffer pool by replaying replicated physical logs.
Log replica: It has the same step 1 and 2 of secondary replica and that’s it. The log replica servers primarily for high availability purposes and does not need to apply the logs. The log replica consumes minimal CPU resources, only 80% of single core, and less than 8GB of memory.
Here’s the full architecture
During a planned switch, the secondary is about to become the new primary. Although it already has the latest data, its buffer pool may not contain the same frequently accessed page as the old primary. Hence, the new primary would need to load those pages from disk after the traffic moves to it, causing the latency spikes and lower QPS.
TDSQL addresses this issue by performing the buffer pool warm-up strategy in advance. So, the primary replica asynchronously dumps snapshot information of the buffer pool, identifies the hot range of B+Tree data and share this snapshot information with the secondary replica. By doing so, the secondary replica can pre-fetch the data from disk to the buffer pool.
Some applications (i.e financial scenario) need zero inconsistency. Many databases prioritize performance over strict correctness, i. e MySQL supports asynchronous replication, which can introduce data consistency issues.
Semi-synchronous replication ensure data replication through strong synchronization, but still fall back to asynchronous replication in some scenarios (i.e the replica fails to respond within a specific time), hence, still incurs the inconsistency.
ℹ️ Semi-synchronous replication: The primary usually waits for at least one replica to acknowledge
TDSQL adopts a Raft-inspired strong synchronous replication mechanism, which requires majority of replica to acknowledge. This ensures strong consistency and prevents ddany incorrectness of data.
TDSQL adopted 3 major optimizations:
- user ACK thread: The Raft-based synchronous replication leads to notable decrease in performance compared to the asynchronous mode (often by half). It’s due to the waiting the user thread (the thread that process client’s request) needs to wait for majority ACKs. TDSQL optimizes this by introducing a new worker thread type - user ACK thread. After the transaction enters the pending commit stage, the user thread hands the waiting work to the User ACK thread., the user thread won’t wait for the ACKs from secondary replica, instead, it delegates that waiting for the user ACK thread, and continue to process other client requests.
- User thread A user thread is the database worker thread that handles a client session/request. It usually does work like: parse SQL, check permissions, execute query, call storage engine, generate transaction logs, handle commit / rollback, send result back to client.
- group writing: TDSQL batches log replay/write operations on secondary replicas, reducing per-log overhead and allowing ACKs to return faster.
- fail-over: When primary fails, before being promoted to be the new primary replica, the secondary replica must check with the log replica if it has all the logs, if not, it reads the missing logs and applies them. All unfinished transactions are rolled back, background threads are spawned to initialize the transaction system. When the secondary replica is successfully promoted to be the primary replica, the log replica connects to it.
TDSQL needs to scale database instances without interrupting live traffic. This is difficult because scaling a distributed instance usually means moving some shards from old Sets to new Sets.
During migration, the system must handle several risks: new data may still be written to the old Set, the routing table may temporarily point to the wrong place, and old redundant data may still exist after migration. If these steps are not handled carefully, queries may read stale data, writes may be lost, or the system may suffer from latency spikes.
TDSQL uses hash-based sharding. A shard key field is chosen per table, and the target Set for any row is determined by hash(shard_key) % num_shards. The compute module consults a routing table on every query to decide which Set to forward the request to.
This makes the common case fast: if the WHERE clause includes the shard key, the compute module routes the query directly to one Set with no fan-out. The flip side is that the choice of shard key matters a lot. A poor shard key can cause hot spots where one Set receives disproportionate traffic while others sit idle.
Hash sharding also trades range scan efficiency for even distribution. Range queries that do not include the shard key must be scattered across all Sets and their results merged at the compute layer. This is a known trade-off: TDSQL's target workload is high-throughput transactional reads and writes keyed by a well-known field such as user ID or order ID, not analytical range scans.
TDSQL performs online scaling in four main steps:
- Data synchronization
- Data verification
- Route update
- Redundant data deletion
First, data is copied from the old Set to the new Set. While copying, TDSQL keeps synchronizing new changes and verifies that the new Set is catching up. When the delay is small enough, the system updates the routing table so future requests go to the new Set.
During the route update, TDSQL briefly freezes writes to the affected partition. Any incoming write is retried after the route switch finishes. This short freeze prevents writes from going to the wrong Set during the transition.
At the storage layer, it blocks the migrating partition during route updates, preventing inconsistent writes. At the compute layer, it rewrites SQL based on the latest routing table, so queries only access the correct Set even if old redundant data still exists.
After migration, TDSQL uses delayed deletion instead of deleting old data immediately. This avoids sudden IO spikes and reduces impact on live workloads. Keeping redundant data temporarily also helps with debugging and repair if migration issues are found.
When a query's filter includes the shard key, the compute module routes it to exactly one Set - this is the fast path. But many real queries do not have that luxury: a JOIN between two tables with different shard keys, or an aggregation without the shard key in the filter, must touch multiple Sets.
In those cases the compute module fans the query out to all relevant Sets, collects the partial results, and merges them locally. Distributed aggregations like ORDER BY, GROUP BY, and LIMIT require the compute layer to sort and merge results from multiple Sets before returning a final answer to the client.
The paper does not go into detail on the cross-shard execution engine beyond noting that the compute module handles SQL rewriting and routing. The general principle is that the application sees normal MySQL-compatible SQL semantics, but the performance characteristics shift significantly depending on whether the shard key is present in the query - a single-shard query is near-local speed, while a full fan-out query pays a scatter-gather cost proportional to the number of Sets.
TDSQL has implemented extensive optimizations target various level of locks, including table lock, row lock, transaction ID and Purge.
TDSQL removes some InnoDB table-level locks because MySQL already has server-level Metadata Locks that provide similar protection. Keeping both layers would create redundant lock conflicts and reduce concurrency. The exception is AUTO_INC, which is still needed for safe auto-increment value generation.
The Metadata Lock (MDL) are server-level locks used to protect table metadata while SQL statements are running. The table metadata includes: Table definition, columns, indexes, schema version, table existence. For example, when a transaction reads from a table, MySQL needs to make sure that another session does not drop or alter that table at the same time.
-- Session 1
SELECT * FROM orders WHERE id = 1;
-- Session 2
ALTER TABLE orders ADD COLUMN source VARCHAR(32);-- Session 1
SELECT * FROM orders WHERE id = 1;
-- Session 2
ALTER TABLE orders ADD COLUMN source VARCHAR(32);Without metadata locking, Session 2 might change the table structure while Session 1 is still using the old structure. That could lead to incorrect execution or internal inconsistency.
InnoDB stores row-lock metadata in a lock hash table. The key is based on the physical page that contains the locked record. In InnoDB, a page is identified by (space_id, page_no): space_id is the tablespace ID, and page_no is the page number inside that tablespace. Since many record locks are organized by page, locks for rows on the same page are stored in the same hash bucket.
In the original design, accessing or updating this lock hash table requires a global lock to protect its internal structure. Under high-concurrency DML workloads, many transactions frequently acquire and release row locks, so they contend on the same global lock. TDSQL reduces this contention by splitting the global lock into multiple segment locks. Each segment protects only a range of hash keys, so transactions touching different pages can update lock metadata more independently.
⚠️ But this comes with a cost that the deadlock detector is much more complex.
InnoDB uses global transaction-tracking structures for MVCC. These structures record which transactions are currently active, so the database can decide which row versions are visible to each query. However, every transaction must be added when it starts and removed when it commits. In workloads with many short transactions, these frequent updates cause contention because the global structure is protected by a global lock.
TDSQL replaces these global arrays and lists with a lock-free hash table. Each active transaction is inserted into the hash table using its transaction ID as the key. When the transaction commits, it is removed from the table. This reduces contention on transaction start and commit paths.
The tradeoff is that some MVCC operations still need to find the minimum active transaction ID. With a hash table, this requires traversing the table, which can consume CPU when there are many active transactions. To reduce this overhead, TDSQL optimizes the hash table layout with techniques such as cache-line alignment and weaker memory barriers where safe.
InnoDB uses a single Purge thread to handle the cleanup of undo logs and discarded index records. This Purge thread acts like a coordinator, find the cleanable logs, create tasks, adds to task queue, wakens up some worker thread to do the tasks.
In this process, there are 2 hot contention places:
- Acquiring global lock when waking up worker thread.
- Acquiring global lock when worker thread retrieve tasks from the task queue.
As the number of worker thread is constant. TDSQL does 2 things:
- Convert global lock for waking up worker thread to smaller locks held by the worker threads for the wake-up operation.
- Partition the task queue and assign task partitions to workers. So each worker has it own partition, they don’t compete the task queue anymore. 💭 In their paper, they only mentioned this briefly, so we don’t really know the detail. In this approach, I think there’s another problem arises which is the hot partition.
Traditional methods like one thread per connection or using thread pool have following challenges:
- High network latency & CPU strain due to network packets.
- Thread-switching overhead
- Memory overhead, for example, each user connection in the TDSQL kernel consumes ~3MB of memory
- Limited horizontal scalability: if every client session needs connections to many Data nodes, the total of TCP connections grows quickly: connections = client sessions x data nodes.
In order to address above challenges, TDSQL did:
-
Changes the transmission protocol by adding a
connidfield to the packet header. This allows multiple logical client sessions to share a single physical TCP connection.sqlBefore: Session 1 → TCP connection 1 Session 2 → TCP connection 2 Session 3 → TCP connection 3 After: One TCP connection ├── connid = 1 → Session 1 ├── connid = 2 → Session 2 └── connid = 3 → Session 3Before: Session 1 → TCP connection 1 Session 2 → TCP connection 2 Session 3 → TCP connection 3 After: One TCP connection ├── connid = 1 → Session 1 ├── connid = 2 → Session 2 └── connid = 3 → Session 3But this also introduces some new challenges like: handling session-level management carefully, i.e determine how many sessions can share one connection, support terminating a single session without closing the whole TCP connection, correctly handle interleaved packets from different sessions.
-
Refactors the network module from blocking I/O to non-blocking I/O based on
epoll. In the old model, a thread could block while waiting for data from a socket. In the new model, a small number of network threads can monitor many connections and only process sockets that are ready.
TDSQL reduces memory usage by moving some cached resources from per-session storage to a global shared cache. Previously, each THD object could keep its own copy of resources such as parsed stored procedures. If parsing a stored procedure required 500K of memory, and there were 50,000 connections, it would consume ~23GB of memory. From observation, TDSQL see that these resources can be shared globally, and, not all 50,000 connections are active. So they stores these resources globally in a lock-free hash table, and reduce the # of concurrency (the number of active connection) to 256, resulting in significant reduction in the memory usage.
In a single-node database, MVCC can provide consistent reads using local transaction state. However, this becomes harder in a distributed database because one query may read from multiple shards, and each shard may observe transactions at slightly different stages. Without a global rule, a query could see only part of a distributed transaction, which is especially dangerous in financial workloads.
TDSQL solves this by introducing a Global Transaction Sequence, or GTS. Each transaction is assigned a globally ordered timestamp, and each query also reads at a specific GTS. A row is visible only if its GTS is less than or equal to the query’s GTS. If the row belongs to a transaction that is still in the prepare phase, the query waits until the transaction’s final status is known before deciding visibility. The exception is read-your-own-write: a transaction can always see its own updates.
A Global Transaction Sequence (GTS) is a globally ordered timestamp assigned to each transaction. It is used to determine the visibility of rows in a distributed transaction.
Since visibility checks happen very frequently, TDSQL optimizes the implementation by using prepare-stage GTS for earlier decisions, maintaining efficient mappings between transaction IDs and GTS values, cleaning old metadata based on the oldest active transaction, and reducing communication overhead in distributed transaction protocols. The goal is to provide globally consistent reads with minimal performance impact.
TDSQL shows how a MySQL-compatible database can be scaled into a large distributed system while still preserving strong consistency and high availability.
The paper’s main idea is that scalability does not come from one single technique. TDSQL improves the whole stack: physical replication reduces binlog overhead, distributed transactions preserve correctness, lock and memory optimizations remove internal bottlenecks, and network multiplexing helps the system handle many connections efficiently.
https://www.vldb.org/pvldb/vol17/p3869-chen.pdf
https://arpitbhayani.me/blogs/mysql-replication-internals