BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Raft Engine: a Log-Structured Embedded Storage Engine for Multi-Raft Logs in TiKV

Raft Engine: a Log-Structured Embedded Storage Engine for Multi-Raft Logs in TiKV

Key Takeaways

  • TiDB is an open-source, distributed, NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. TiKV provides a row-oriented, key-value storage engine for TiDB.
  • In the previous versions of TiKV, RocksDB was used to store Raft Logs. However, the disk I/O was huge. The huge disk I/O leads to high costs and performance fluctuations.
  • To reduce the disk I/O, we design and implement Raft Engine based on BitCask. 
  • Raft Engine stores Raft logs sequentially on the disk, and an in-memory index points to the logs. 
  • Compared to BitCask, the Raft logs are compressed before being written into disks and there are no hint files. These improvements reduce the disk I/O further. 
  • Our internal benchmark shows that Raft Engine improves throughput by up to 4%, and reduces tail latency by 20%; the write I/O is reduced by 25% ~ 40%; the CPU usage is reduced by 12% under heavy workloads.
     

TiDB is an open-source, distributed, NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. It is MySQL compatible and features horizontal scalability, strong consistency, and high availability. 

TiKV provides a row-oriented, key-value storage engine for TiDB. Data in TiKV is split into several Regions, the smallest data unit for replication and load balancing. To achieve high availability (HA), each Region is replicated three times and then distributed among different TiKV nodes. The replicas in a single Region form a Raft group. 

Figure 1. The architecture of TiKV

There are two parts of data that need to be stored: the data itself and the Raft logs. Two RocksDB instances are used to store them in each TiKV node. In earlier versions, the Raft logs were stored in RocksDB, where they are converted into key-value pairs. Using RocksDB to store Raft logs caused write amplification (WA), which created huge amounts of disk I/O. TiDB 5.4 introduced Raft Engine—a log-structured embedded storage engine for multi-Raft logs in TiKV.

In this article, we will dive deep into Raft Engine—why we need it, how we designed and implemented it, and the performance benefits it brings.

Pain points of using RocksDB to store Raft logs

The biggest pain point of using RocksDB to store Raft logs is the large amount of disk I/O that’s generated. In RocksDB, the disk I/O comes from two places.

First, in RocksDB key-value pairs are inserted into a Write Ahead Log (WAL), and then the data are written to the RocksDB MemTable. Once the data in MemTable reaches a certain size, RocksDB flushes the content into a Sorted String Table (SST) file in the disk. This means that the same information is written to the disks twice.

RocksDB also has write amplification (WA). WA is the ratio between the amount of data written to storage devices and the amount of data written by users. Log-structured merge tree (LSM-tree) based KV stores have long been criticized for their high WA due to frequent compactions. The growing size of the dataset increases the depth of an LSM-tree as well as the overall WA. The increased WA consumes more storage bandwidth, competes with flush operations, and ultimately slows down application throughput.

The large amount of RocksDB disk I/O becomes an obvious issue when deploying TiDB in the cloud environment or using TiDB Cloud (the fully-managed TiDB Service). The reasons are two-fold. First, disk I/O may be charged separately by the cloud vendors. A large disk I/O typically means a large bill. In addition the amount of RocksDB disk I/O fluctuates a lot, which may impact the quality of service (QoS). 

Raft Engine design

Inspired by BitCask, we designed and implemented Raft Engine, a persistent embedded storage engine with a log-structured design, to store multi-Raft logs in TiKV. Figure 1 below shows a simplified Raft Engine architecture.

Figure 2. A simplified Raft Engine architecture

On the disk, write requests, both the key and the actual data, are sequentially written to the active append-only log file. When a configurable threshold is reached, a new file is created.

MemTable is an in-memory hash table. It contains log keys and the page location (offset) of that key. The MemTable is an index for the storage on disk. The MemTable is split into several parts. When a read or write request comes in, the MemTable Router directs the request to the corresponding MemTable. The following code shows the data structure of the MemTable.

pub struct MemTable {
    /// The ID of the current Raft Group.
    region_id: u64,

    /// Container of entries. Incoming entries are pushed to the back with
    /// ascending log indexes.
    entry_indexes: VecDeque<ThinEntryIndex>,
    /// The log index of the first entry.
    first_index: u64,
    /// The amount of rewritten entries. Rewritten entries are the oldest
    /// entries and stored at the front of the container.
    rewrite_count: usize,

    /// A map of active key value pairs.
    kvs: BTreeMap<Vec<u8>, (Vec<u8>, FileId)>,

    /// Shared statistics.
    global_stats: Arc<GlobalStats>,
}

We have now seen the high level design of Raft Engine in previous discussions. As a storage engine, how it deals with read, insert, update, and delete requests are of vital importance. In the following paragraphs, we elaborate how the Raft Engine responds to the requests from TiKV to read, insert, update, and delete log files.

Read a log file

When reading a log file, the reader first visits the MemTable Router to locate the corresponding MemTable. The reader gets the page location of the log from the MemTable based on the log key. Once that information is available, the engine performs one disk read from the file on the disk. We have included an example code from engine.rs of reading a log file.

 

pub fn get_message<S: Message>(&self, region_id: u64, key: &[u8]) -> Result<Option<S>> {
    let _t = StopWatch::new(&ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
    if let Some(memtable) = self.memtables.get(region_id) {
        if let Some(value) = memtable.read().get(key) {
            return Ok(Some(parse_from_bytes(&value)?));
        }
    }
    Ok(None)
}

Insert a log file

To respond to an insert requests from TiKV, the engine:

  1. Appends the entry and key value to the active data file (stream)
  2. Creates an entry in the MemTable 

These two operations are performed atomically. Inserting a new record only involves one disk write and no disk seek. We have included an example code from engine.rs of writing a log file.

Raft Engine also supports lz4 compression. You have the option of compressing the data before it’s written to the disk. This compression can help reduce the write I/O. The records in the MemTable are not compressed. 

pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> {
    let start = Instant::now();
    let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
    let block_handle = {
        let mut writer = Writer::new(log_batch, sync, start);
        if let Some(mut group) = self.write_barrier.enter(&mut writer) {
            let now = Instant::now();
            let _t = StopWatch::new_with(&ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now);
            for writer in group.iter_mut() {
                ENGINE_WRITE_PREPROCESS_DURATION_HISTOGRAM.observe(
                now.saturating_duration_since(writer.start_time)
                        .as_secs_f64(),
                );
                sync |= writer.sync;
                let log_batch = writer.get_payload();
                let res = if !log_batch.is_empty() {
                    self.pipe_log
                        .append(LogQueue::Append, log_batch.encoded_bytes())
                } else {
                    Ok(FileBlockHandle {
                        id: FileId::new(LogQueue::Append, 0),
                        offset: 0,
                        len: 0,
                    })
                };
                writer.set_output(res);
            }
            if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) {
                panic!(
                    "Cannot sync {:?} queue due to IO error: {}",
                    LogQueue::Append,
                    e
                );
            }
        }
        writer.finish()?
    };

Update a log file

Serving the request to update a log file is similar to inserting a new KV pair. The difference is that, instead of creating a new entry in MemTable, the existing record is updated with the new page location. The old value in the disk is now dangling and will be removed during garbage collection in the purge phase.

Delete a log file

When TiKV wants to delete a log file, the engine performs two atomic operations. It: 

  1. Appends a new entry in the active data stream with the value equalling a tombstone
  2. Deletes the MemTable entry 

The old record is now dangling and will be removed during garbage collection in the purge phase. 

Raft Engine recovery

When the Raft Engine is restarted, the engine reads the Log Key on the disk and builds the MemTable based on it. This is different from BitCask. In BitCask, there are hint files stored on the disk, which hold the necessary data for recovery. We depreciated the hint file in Raft Engine, as it causes extra write I/O in the background. The Raft Engine still recovers very fast in our internal benchmarks. It usually took less than 2 seconds to recover, when the data stored on Raft Engine is less than 10 GB.

Purge

The purge phase permanently deletes old entries on the disks. Purge is similar to the compaction in the RocksDB. However, the purge in Raft Engine is much more efficient.

During the compaction phase in RocksDB, all data files must be read into memory, combined, and then flushed to the disk. Because Raft Engine has an index for all records in MemTable, it only needs to read the up-to-date data, combine them into a file, and then flush them to the disk. Obsolete pages are then abandoned.

The user determines when data is purged The Raft Engine consolidates and removes its log files only when the user calls the `purge_expired_files()` routine. By default, TiKV calls the purge function every 10 seconds.

The Raft Engine also sends useful feedback to the user. Each time the `purge_expired_files()` routine is called, Raft Engine returns a list of Raft Groups that hold particularly old log entries. Those log entries block garbage collection and should be compacted by the user.

Design considerations

Why does the Raft Engine meet our needs better than the RocksDB? 

As discussed earlier, in our scenario RocksDB generates huge amounts of disk I/O. The values in RocksDB are sorted by the keys. This provides very good performance when conducting range queries; however, in our case, we don’t need those. Sorting the values based on keys inevitably leads to huge write amplification. 

Raft Engine does not have this feature, so it generates less disk I/O.

Why are there multiple MemTables?

When data is read from or written to a MemTable, the table must be locked. Subsequent requests must wait in the queue until the current operation finishes. When multiple MemTables are used, if two requests access different MemTables, they do not block each other.

Performance evaluation

In this section, we show our internal benchmark results based on TPC-C. We set up the TiDB cluster as follows: 

  • The TiDB and Placement Driver (PD) servers are on AWS c5.4xlarge instances.
  • The TiKV servers are on r5.4xlarge instances with 1 TB gp3 EBS volumes.
  • For TPC-C benchmarking, we use 5,000 warehouses. 
  • We ran the experiments with 50 and 800 threads. 

Each experiment runs for two hours.

50 threads

 

TpmC

Client latency (ms)

CPU (%)

I/O (MB/s)

IOPS

RocksDB

28,954.7

50th percentile: 44.0

90th: 54.5 

95th: 60.8

99th: 79.7 

99.9th: 151.0

420

Write: 98.0

Read: 24.0

Write: 1850 

Read: 400

Raft Engine

30,146.4

50th: 41.9

90th: 52.4 

95th: 58.7

99th: 79.7 

99.9th: 142.6

430

Write: 75.0

Read: 25.0

Write: 1850

Read: 450

800 threads

 

TpmC

Client latency (ms)

CPU (%)

I/O (MB/s)

IOPS

RocksDB

54,846.9

50th percentile: 402.7

90th: 570.4

95th: 604.0

99th: 805.3

99.9th: 1073.7

850

Write: 209.0 

Read 54.0

Write: 1750

Read: 1200

Raft Engine

56,020.5

50th: 402.7

90th: 570.4

95th: 604.0

99th: 671.1

99.9th: 838.9

750

Write: 129.0

Read: 49.0

Write: 1700 

Read: 1200

From the tables above, we can conclude that, compared to the TiKV with RocksDB as the storage for Raft logs, Raft Engine improves QPS by up to 4%, and reduces tail latency by 20%. Write I/O is reduced by 25% ~ 40%. The CPU usage is reduced by 12% under heavy workloads.

Running Example

If you would like to try it out, we have included a running example in Raft Engine’s Github repository. It mimics the write requests sent out by TiKV. The code first initializes an instance of Raft Engine, and then writes some logs into it. The code also compacts the logs, and initializes a purge phase.

You can run this example by:

$ RUST_LOG=debug cargo run --release --example append-compact-purge

// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.

use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize};
use rand::thread_rng;
use rand_distr::{Distribution, Normal};

#[derive(Clone)]
pub struct MessageExtTyped;

impl MessageExt for MessageExtTyped {
    type Entry = Entry;

    fn index(e: &Self::Entry) -> u64 {
        e.index
    }
}

// How to run the example:
// $ RUST_LOG=debug cargo run --release --example append-compact-purge
fn main() {
    env_logger::init();

    let config = Config {
        dir: "append-compact-purge-data".to_owned(),
        purge_threshold: ReadableSize::gb(2),
        batch_compression_threshold: ReadableSize::kb(0),
        ..Default::default()
    };
    let engine = Engine::open(config).expect("Open raft engine");

    let compact_offset = 32; // In src/purge.rs, it's the limit for rewrite.

    let mut rand_regions = Normal::new(128.0, 96.0)
        .unwrap()
        .sample_iter(thread_rng())
        .map(|x| x as u64);
    let mut rand_compacts = Normal::new(compact_offset as f64, 16.0)
        .unwrap()
        .sample_iter(thread_rng())
        .map(|x| x as u64);

    let mut batch = LogBatch::with_capacity(256);
    let mut entry = Entry::new();
    entry.set_data(vec![b'x'; 1024 * 32].into());
    let init_state = RaftLocalState {
        last_index: 0,
        ..Default::default()
    };
    loop {
        for _ in 0..1024 {
            let region = rand_regions.next().unwrap();
            let state = engine
                .get_message::<RaftLocalState>(region, b"last_index")
                .unwrap()
                .unwrap_or_else(|| init_state.clone());

            let mut e = entry.clone();
            e.index = state.last_index + 1;
            batch.add_entries::<MessageExtTyped>(region, &[e]).unwrap();
            batch
                .put_message(region, b"last_index".to_vec(), &state)
                .unwrap();
            engine.write(&mut batch, false).unwrap();

            if state.last_index % compact_offset == 0 {
                let rand_compact_offset = rand_compacts.next().unwrap();
                if state.last_index > rand_compact_offset {
                    let compact_to = state.last_index - rand_compact_offset;
                    engine.compact_to(region, compact_to);
                    println!("[EXAMPLE] compact {} to {}", region, compact_to);
                }
            }
        }
        for region in engine.purge_expired_files().unwrap() {
            let state = engine
                .get_message::<RaftLocalState>(region, b"last_index")
                .unwrap()
                .unwrap();
            engine.compact_to(region, state.last_index - 7);
            println!(
                "[EXAMPLE] force compact {} to {}",
                region,
                state.last_index - 7
            );
        }
    }
}

About the Authors

Rate this Article

Adoption
Style

BT