-
Notifications
You must be signed in to change notification settings - Fork 411
Closed
Labels
type/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.
Description
Enhancement
In a high density query workload, we could spot that appending data to output may also introduce some latency.

This can be improved by hinting the compiler about the fast path of appending small data, where buffer overflow is unlikely to happen.
template <class T>
__attribute__((always_inline)) void writeFixed(const T * __restrict from)
{
if (likely(working_buffer.end() - pos >= static_cast<ptrdiff_t>(sizeof(T))))
{
tiflash_compiler_builtin_memcpy(pos, from, sizeof(T));
pos += sizeof(T);
}
else
{
[&]() __attribute__((noinline))
{
write(reinterpret_cast<const char *>(from), sizeof(T));
}
();
}
}The flamegraph is now:
Workload Generator
use rayon::prelude::*;
use clap::Parser;
use mysql::prelude::Queryable;
use rand::Rng;
#[derive(Parser, Debug)]
/// Delta tree benchmark toolset.
struct Cli {
/// database url
#[clap(short, long)]
url: String,
/// number of execution threads
#[clap(short, long, default_value = "8")]
thread: usize,
/// initial database size
#[clap(short, long, default_value = "1000")]
initial_size: usize,
/// relative portion of insertion
#[clap(short = 'I', long, default_value = "0.1")]
insert: f64,
#[clap(short, long, default_value = "0.1")]
/// relative portion of deletion
delete: f64,
#[clap(short, long, default_value = "0.8")]
/// relative portion of query
query: f64,
#[clap(short, long, default_value = "10000000")]
/// count of operations
count: usize,
}
fn insert(pool: &mysql::Pool, rng: &mut rand::rngs::ThreadRng) {
let data = rng.gen_range(-100f64..100f64);
pool
.get_conn()
.unwrap()
.exec_drop("INSERT INTO tiflash_deltatree_test (data) VALUES (?)", (data, ))
.unwrap();
log::info!("inserted 1 record from {:?}, data: {}", std::thread::current().id(), data);
}
fn delete(pool: &mysql::Pool, rng: &mut rand::rngs::ThreadRng) {
let center = rng.gen_range(-90f64..90f64);
pool
.get_conn()
.unwrap()
.exec_drop("DELETE FROM tiflash_deltatree_test WHERE (data >= ?) and (data <= ?) LIMIT 1", (center - 10f64, center + 10f64))
.unwrap();
log::info!("deleted records from {:?}, center: {}", std::thread::current().id(), center);
}
fn query(pool: &mysql::Pool, rng: &mut rand::rngs::ThreadRng) {
let center = rng.gen_range(-70f64..70f64);
pool
.get_conn()
.unwrap()
.exec_drop("SELECT COUNT(*) FROM tiflash_deltatree_test WHERE (data >= ?) AND (data <= ?)", (center - 30f64, center + 30f64))
.unwrap();
log::info!("queried records from {:?}, center: {}", std::thread::current().id(), center);
}
fn main() {
env_logger::init();
let cli : Cli = Cli::parse();
rayon::ThreadPoolBuilder::new().num_threads(cli.thread).build_global().unwrap();
let db_opts = mysql::Opts::from_url(cli.url.as_str()).unwrap();
let pool = mysql::Pool::new(db_opts).unwrap();
pool
.get_conn()
.unwrap()
.query_drop(
r#"CREATE TABLE IF NOT EXISTS tiflash_deltatree_test (
data DECIMAL NOT NULL
)"#)
.unwrap();
(0..cli.initial_size).into_par_iter()
.for_each(|_| {
let mut rng = rand::thread_rng();
insert(&pool, &mut rng);
});
log::info!("initialization finished");
pool
.get_conn()
.unwrap()
.query_drop("ALTER TABLE tiflash_deltatree_test SET TIFLASH REPLICA 2")
.unwrap();
loop {
let status = pool
.get_conn()
.unwrap()
.query_first("select AVAILABLE from information_schema.tiflash_replica WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'tiflash_deltatree_test'")
.unwrap();
log::info!("replica status: {}", status.unwrap_or(0));
if let Some(1) = status {
break;
}
}
for _ in 0..10 {
pool
.get_conn()
.unwrap()
.query_drop("ANALYZE TABLE tiflash_deltatree_test")
.unwrap();
log::info!("analyze table tiflash_deltatree_test");
}
let insert_range = 0f64..cli.insert;
let delete_range = cli.insert..(cli.insert + cli.delete);
let query_range = (cli.insert + cli.delete)..(cli.insert + cli.delete + cli.query);
let total_range = 0f64..(cli.insert + cli.delete + cli.query);
(0..cli.count).into_par_iter()
.for_each(|_| {
let mut rng = rand::thread_rng();
let select = rng.gen_range(total_range.clone());
if insert_range.contains(&select) {
insert(&pool, &mut rng);
} else if delete_range.contains(&select) {
delete(&pool, &mut rng);
} else if query_range.contains(&select) {
query(&pool, &mut rng);
}
});
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/enhancementThe issue or PR belongs to an enhancement.The issue or PR belongs to an enhancement.
