Skip to content

Improve TiDBColumn output generation #5183

@SchrodingerZhu

Description

@SchrodingerZhu

Enhancement

In a high density query workload, we could spot that appending data to output may also introduce some latency.
image

This can be improved by hinting the compiler about the fast path of appending small data, where buffer overflow is unlikely to happen.

    template <class T>
    __attribute__((always_inline)) void writeFixed(const T * __restrict from)
    {
        if (likely(working_buffer.end() - pos >= static_cast<ptrdiff_t>(sizeof(T))))
        {
            tiflash_compiler_builtin_memcpy(pos, from, sizeof(T));
            pos += sizeof(T);
        }
        else
        {
            [&]() __attribute__((noinline))
            {
                write(reinterpret_cast<const char *>(from), sizeof(T));
            }
            ();
        }
    }

The flamegraph is now:

image

Workload Generator

use rayon::prelude::*;
use clap::Parser;
use mysql::prelude::Queryable;
use rand::Rng;

#[derive(Parser, Debug)]
/// Delta tree benchmark toolset.
struct Cli {
    /// database url
    #[clap(short, long)]
    url: String,

    /// number of execution threads
    #[clap(short, long, default_value = "8")]
    thread: usize,

    /// initial database size
    #[clap(short, long, default_value = "1000")]
    initial_size: usize,

    /// relative portion of insertion
    #[clap(short = 'I', long, default_value = "0.1")]
    insert: f64,

    #[clap(short, long, default_value = "0.1")]
    /// relative portion of deletion
    delete: f64,

    #[clap(short, long, default_value = "0.8")]
    /// relative portion of query
    query: f64,

    #[clap(short, long, default_value = "10000000")]
    /// count of operations
    count: usize,
}

fn insert(pool: &mysql::Pool, rng: &mut rand::rngs::ThreadRng) {
    let data = rng.gen_range(-100f64..100f64);
    pool
        .get_conn()
        .unwrap()
        .exec_drop("INSERT INTO tiflash_deltatree_test (data) VALUES (?)", (data, ))
        .unwrap();
    log::info!("inserted 1 record from {:?}, data: {}", std::thread::current().id(), data);
}

fn delete(pool: &mysql::Pool, rng: &mut rand::rngs::ThreadRng) {
    let center = rng.gen_range(-90f64..90f64);
    pool
        .get_conn()
        .unwrap()
        .exec_drop("DELETE FROM tiflash_deltatree_test WHERE (data >= ?) and (data <= ?) LIMIT 1", (center - 10f64, center + 10f64))
        .unwrap();
    log::info!("deleted records from {:?}, center: {}", std::thread::current().id(), center);
}

fn query(pool: &mysql::Pool, rng: &mut rand::rngs::ThreadRng) {
    let center = rng.gen_range(-70f64..70f64);
    pool
        .get_conn()
        .unwrap()
        .exec_drop("SELECT COUNT(*) FROM tiflash_deltatree_test WHERE (data >= ?) AND (data <= ?)", (center - 30f64, center + 30f64))
        .unwrap();
    log::info!("queried records from {:?}, center: {}", std::thread::current().id(), center);
}

fn main() {
    env_logger::init();
    let cli : Cli = Cli::parse();
    rayon::ThreadPoolBuilder::new().num_threads(cli.thread).build_global().unwrap();
    let db_opts = mysql::Opts::from_url(cli.url.as_str()).unwrap();
    let pool = mysql::Pool::new(db_opts).unwrap();
    pool
        .get_conn()
        .unwrap()
        .query_drop(
        r#"CREATE TABLE IF NOT EXISTS tiflash_deltatree_test (
                data DECIMAL NOT NULL
        )"#)
        .unwrap();
    (0..cli.initial_size).into_par_iter()
        .for_each(|_| {
            let mut rng = rand::thread_rng();
            insert(&pool, &mut rng);
        });
    log::info!("initialization finished");

    pool
        .get_conn()
        .unwrap()
        .query_drop("ALTER TABLE tiflash_deltatree_test SET TIFLASH REPLICA 2")
        .unwrap();

    loop {
        let status =  pool
            .get_conn()
            .unwrap()
            .query_first("select AVAILABLE from information_schema.tiflash_replica WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'tiflash_deltatree_test'")
            .unwrap();
        log::info!("replica status: {}", status.unwrap_or(0));
        if let Some(1) = status {
            break;
        }
    }

    for _ in 0..10 {
        pool
            .get_conn()
            .unwrap()
            .query_drop("ANALYZE TABLE tiflash_deltatree_test")
            .unwrap();
        log::info!("analyze table tiflash_deltatree_test");
    }

    let insert_range = 0f64..cli.insert;
    let delete_range = cli.insert..(cli.insert + cli.delete);
    let query_range = (cli.insert + cli.delete)..(cli.insert + cli.delete + cli.query);
    let total_range = 0f64..(cli.insert + cli.delete + cli.query);
    (0..cli.count).into_par_iter()
        .for_each(|_| {
            let mut rng = rand::thread_rng();
            let select = rng.gen_range(total_range.clone());
            if insert_range.contains(&select) {
                insert(&pool, &mut rng);
            } else if delete_range.contains(&select) {
                delete(&pool, &mut rng);
            } else if query_range.contains(&select) {
                query(&pool, &mut rng);
            }
        });
}

Metadata

Metadata

Labels

type/enhancementThe issue or PR belongs to an enhancement.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions