feat: benchmark suit based on nyc taxi dataset (#384)

* solve dep conflict

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: nyc taxi dataset writer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix some literals

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add some queries

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix progress bar

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* able to skip write or read

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename to nyc-taxi.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove main.rs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adapt new client api

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* allow stdout output in this cli

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some default values

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-11-04 14:13:17 +08:00
committed by GitHub
parent fc6d73b06b
commit bfcd74fd16
8 changed files with 625 additions and 7 deletions

3
.gitignore vendored
View File

@@ -28,3 +28,6 @@ logs/
# cpython's generated python byte code
**/__pycache__/
# Benchmark dataset
benchmarks/data

162
Cargo.lock generated
View File

@@ -158,6 +158,30 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
version = "10.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1328dbc6d5d76a08b13df3ac630f61a6a31276d9e9d08eb813e98efa624c2382"
dependencies = [
"bitflags",
"chrono",
"csv",
"flatbuffers",
"half",
"hex",
"indexmap",
"lazy_static",
"lexical-core",
"multiversion",
"num",
"rand 0.8.5",
"regex",
"serde",
"serde_derive",
"serde_json",
]
[[package]]
name = "arrow-format"
version = "0.4.0"
@@ -461,6 +485,19 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "benchmarks"
version = "0.1.0"
dependencies = [
"arrow",
"clap 4.0.18",
"client",
"indicatif",
"itertools",
"parquet",
"tokio",
]
[[package]]
name = "bigdecimal"
version = "0.3.0"
@@ -794,8 +831,8 @@ checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"clap_derive 3.2.18",
"clap_lex 0.2.4",
"indexmap",
"once_cell",
"strsim 0.10.0",
@@ -803,6 +840,21 @@ dependencies = [
"textwrap 0.15.1",
]
[[package]]
name = "clap"
version = "4.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "335867764ed2de42325fafe6d18b8af74ba97ee0c590fa016f157535b42ab04b"
dependencies = [
"atty",
"bitflags",
"clap_derive 4.0.18",
"clap_lex 0.3.0",
"once_cell",
"strsim 0.10.0",
"termcolor",
]
[[package]]
name = "clap_derive"
version = "3.2.18"
@@ -816,6 +868,19 @@ dependencies = [
"syn",
]
[[package]]
name = "clap_derive"
version = "4.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16a1b0f6422af32d5da0c58e2703320f379216ee70198241c84173a8c5ac28f3"
dependencies = [
"heck 0.4.0",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
@@ -825,6 +890,15 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "clap_lex"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "client"
version = "0.1.0"
@@ -1764,6 +1838,17 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499"
[[package]]
name = "flatbuffers"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea97b4fe4b84e2f2765449bcea21cbdb3ee28cecb88afbf38a0c2e1639f5eb5"
dependencies = [
"bitflags",
"smallvec",
"thiserror",
]
[[package]]
name = "flate2"
version = "1.0.24"
@@ -2291,6 +2376,17 @@ dependencies = [
"serde",
]
[[package]]
name = "indicatif"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfddc9561e8baf264e0e45e197fd7696320026eb10a8180340debc27b18f535b"
dependencies = [
"console",
"number_prefix",
"unicode-width",
]
[[package]]
name = "influxdb_line_protocol"
version = "0.1.0"
@@ -2311,6 +2407,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "integer-encoding"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f"
[[package]]
name = "integer-encoding"
version = "3.0.4"
@@ -3129,6 +3231,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.29.0"
@@ -3256,7 +3364,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-semantic-conventions",
"thiserror",
"thrift",
"thrift 0.15.0",
"tokio",
]
@@ -3345,6 +3453,37 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "parquet"
version = "10.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53e9c8fc20af9b92d85d42ec86e5217b2eaf1340fbba75c4b4296de764ea7921"
dependencies = [
"arrow",
"base64",
"brotli",
"byteorder",
"chrono",
"flate2",
"lz4",
"num",
"num-bigint",
"parquet-format",
"rand 0.8.5",
"snap",
"thrift 0.13.0",
"zstd",
]
[[package]]
name = "parquet-format"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f0c06cdcd5460967c485f9c40a821746f5955ad81990533c7fae95dbd9bc0b5"
dependencies = [
"thrift 0.13.0",
]
[[package]]
name = "parquet-format-async-temp"
version = "0.2.0"
@@ -3354,7 +3493,7 @@ dependencies = [
"async-trait",
"byteorder",
"futures",
"integer-encoding",
"integer-encoding 3.0.4",
"ordered-float 1.1.1",
]
@@ -5454,6 +5593,19 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "thrift"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b"
dependencies = [
"byteorder",
"integer-encoding 1.1.7",
"log",
"ordered-float 1.1.1",
"threadpool",
]
[[package]]
name = "thrift"
version = "0.15.0"
@@ -5461,7 +5613,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e"
dependencies = [
"byteorder",
"integer-encoding",
"integer-encoding 3.0.4",
"log",
"ordered-float 1.1.1",
"threadpool",

View File

@@ -1,5 +1,6 @@
[workspace]
members = [
"benchmarks",
"src/api",
"src/catalog",
"src/client",
@@ -32,3 +33,6 @@ members = [
"src/table",
"src/table-engine",
]
[profile.release]
debug = true

15
benchmarks/Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "benchmarks"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = "10"
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
itertools = "0.10.5"
indicatif = "0.17.1"
parquet = { version = "*" }
tokio = { version = "1.21", features = ["full"] }

View File

@@ -0,0 +1,439 @@
//! Use the taxi trip records from New York City dataset to bench. You can download the dataset from
//! [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).
#![feature(once_cell)]
#![allow(clippy::print_stdout)]
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
time::Instant,
};
use arrow::{
array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray},
datatypes::{DataType, Float64Type, Int64Type},
record_batch::RecordBatch,
};
use clap::Parser;
use client::{
admin::Admin,
api::v1::{
codec::InsertBatch, column::Values, insert_expr, Column, ColumnDataType, ColumnDef,
CreateExpr, InsertExpr,
},
Client, Database, Select,
};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
file::{reader::FileReader, serialized_reader::SerializedFileReader},
};
use tokio::task::JoinSet;
const DATABASE_NAME: &str = "greptime";
const CATALOG_NAME: &str = "greptime";
const SCHEMA_NAME: &str = "public";
const TABLE_NAME: &str = "nyc_taxi";
#[derive(Parser)]
#[command(name = "NYC benchmark runner")]
struct Args {
/// Path to the dataset
#[arg(short, long)]
path: Option<String>,
/// Batch size of insert request.
#[arg(short = 's', long = "batch-size", default_value_t = 4096)]
batch_size: usize,
/// Number of client threads on write (parallel on file level)
#[arg(short = 't', long = "thread-num", default_value_t = 4)]
thread_num: usize,
/// Number of query iteration
#[arg(short = 'i', long = "iter-num", default_value_t = 3)]
iter_num: usize,
#[arg(long = "skip-write")]
skip_write: bool,
#[arg(long = "skip-read")]
skip_read: bool,
#[arg(short, long, default_value_t = String::from("127.0.0.1:3001"))]
endpoint: String,
}
fn get_file_list<P: AsRef<Path>>(path: P) -> Vec<PathBuf> {
std::fs::read_dir(path)
.unwrap()
.map(|dir| dir.unwrap().path().canonicalize().unwrap())
.collect()
}
async fn write_data(
batch_size: usize,
db: &Database,
path: PathBuf,
mpb: MultiProgress,
pb_style: ProgressStyle,
) -> u128 {
let file = std::fs::File::open(&path).unwrap();
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let row_num = file_reader.metadata().file_metadata().num_rows();
let record_batch_reader = ParquetFileArrowReader::new(file_reader)
.get_record_reader(batch_size)
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
progress_bar.set_message(format!("{:?}", path));
let mut total_rpc_elapsed_ms = 0;
for record_batch in record_batch_reader {
let record_batch = record_batch.unwrap();
let row_count = record_batch.num_rows();
let insert_batch = convert_record_batch(record_batch).into();
let insert_expr = InsertExpr {
table_name: TABLE_NAME.to_string(),
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![insert_batch],
})),
options: HashMap::default(),
};
let now = Instant::now();
db.insert(insert_expr).await.unwrap();
let elapsed = now.elapsed();
total_rpc_elapsed_ms += elapsed.as_millis();
progress_bar.inc(row_count as _);
}
progress_bar.finish_with_message(format!(
"file {:?} done in {}ms",
path, total_rpc_elapsed_ms
));
total_rpc_elapsed_ms
}
fn convert_record_batch(record_batch: RecordBatch) -> InsertBatch {
let schema = record_batch.schema();
let fields = schema.fields();
let row_count = record_batch.num_rows();
let mut columns = vec![];
for (array, field) in record_batch.columns().iter().zip(fields.iter()) {
let values = build_values(array);
let column = Column {
column_name: field.name().to_owned(),
values: Some(values),
null_mask: vec![],
// datatype and semantic_type are set to default
..Default::default()
};
columns.push(column);
}
InsertBatch {
columns,
row_count: row_count as _,
}
}
fn build_values(column: &ArrayRef) -> Values {
match column.data_type() {
DataType::Int64 => {
let array = column
.as_any()
.downcast_ref::<PrimitiveArray<Int64Type>>()
.unwrap();
let values = array.values();
Values {
i64_values: values.to_vec(),
..Default::default()
}
}
DataType::Float64 => {
let array = column
.as_any()
.downcast_ref::<PrimitiveArray<Float64Type>>()
.unwrap();
let values = array.values();
Values {
f64_values: values.to_vec(),
..Default::default()
}
}
DataType::Timestamp(_, _) => {
let array = column
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let values = array.values();
Values {
i64_values: values.to_vec(),
..Default::default()
}
}
DataType::Utf8 => {
let array = column.as_any().downcast_ref::<StringArray>().unwrap();
let values = array.iter().filter_map(|s| s.map(String::from)).collect();
Values {
string_values: values,
..Default::default()
}
}
DataType::Null
| DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::FixedSizeBinary(_)
| DataType::LargeBinary
| DataType::LargeUtf8
| DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _)
| DataType::Map(_, _) => todo!(),
}
}
fn create_table_expr() -> CreateExpr {
CreateExpr {
catalog_name: Some(CATALOG_NAME.to_string()),
schema_name: Some(SCHEMA_NAME.to_string()),
table_name: TABLE_NAME.to_string(),
desc: None,
column_defs: vec![
ColumnDef {
name: "VendorID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "tpep_pickup_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "tpep_dropoff_datetime".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "passenger_count".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "trip_distance".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "RatecodeID".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "store_and_fwd_flag".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "PULocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "DOLocationID".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "payment_type".to_string(),
datatype: ColumnDataType::Int64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "fare_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "extra".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "mta_tax".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "tip_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "tolls_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "improvement_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "total_amount".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "congestion_surcharge".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "airport_fee".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
],
time_index: "tpep_pickup_datetime".to_string(),
primary_keys: vec!["VendorID".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
}
}
fn query_set() -> HashMap<String, String> {
let mut ret = HashMap::new();
ret.insert(
"count_all".to_string(),
format!("SELECT COUNT(*) FROM {};", TABLE_NAME),
);
ret.insert(
"fare_amt_by_passenger".to_string(),
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {} GROUP BY passenger_count",TABLE_NAME)
);
ret
}
async fn do_write(args: &Args, client: &Client) {
let admin = Admin::new("admin", client.clone());
let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument"));
let mut write_jobs = JoinSet::new();
let create_table_result = admin.create(create_table_expr()).await;
println!("Create table result: {:?}", create_table_result);
let progress_bar_style = ProgressStyle::with_template(
"[{elapsed_precise}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}",
)
.unwrap()
.progress_chars("##-");
let multi_progress_bar = MultiProgress::new();
let file_progress = multi_progress_bar.add(ProgressBar::new(file_list.len() as _));
file_progress.inc(0);
let batch_size = args.batch_size;
for _ in 0..args.thread_num {
if let Some(path) = file_list.pop() {
let db = Database::new(DATABASE_NAME, client.clone());
let mpb = multi_progress_bar.clone();
let pb_style = progress_bar_style.clone();
write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
}
}
while write_jobs.join_next().await.is_some() {
file_progress.inc(1);
if let Some(path) = file_list.pop() {
let db = Database::new(DATABASE_NAME, client.clone());
let mpb = multi_progress_bar.clone();
let pb_style = progress_bar_style.clone();
write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
}
}
}
async fn do_query(num_iter: usize, db: &Database) {
for (query_name, query) in query_set() {
println!("Running query: {}", query);
for i in 0..num_iter {
let now = Instant::now();
let _res = db.select(Select::Sql(query.clone())).await.unwrap();
let elapsed = now.elapsed();
println!(
"query {}, iteration {}: {}ms",
query_name,
i,
elapsed.as_millis()
);
}
}
}
fn main() {
let args = Args::parse();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(args.thread_num)
.enable_all()
.build()
.unwrap()
.block_on(async {
let client = Client::with_urls(vec![&args.endpoint]);
if !args.skip_write {
do_write(&args, &client).await;
}
if !args.skip_read {
let db = Database::new(DATABASE_NAME, client.clone());
do_query(args.iter_num, &db).await;
}
})
}

View File

@@ -4,6 +4,8 @@ mod database;
mod error;
pub mod load_balance;
pub use api;
pub use self::{
client::Client,
database::{Database, ObjectResult, Select},

View File

@@ -2,6 +2,7 @@
name = "cmd"
version = "0.1.0"
edition = "2021"
default-run = "greptime"
[[bin]]
name = "greptime"
@@ -10,7 +11,9 @@ path = "src/bin/greptime.rs"
[dependencies]
clap = { version = "3.1", features = ["derive"] }
common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"] }
common-telemetry = { path = "../common/telemetry", features = [
"deadlock_detection",
] }
datanode = { path = "../datanode" }
frontend = { path = "../frontend" }
futures = "0.3"

View File

@@ -74,7 +74,7 @@ pub trait BucketAligned {
impl<T: Into<i64>> BucketAligned for T {
fn align_by_bucket(self, bucket_duration: i64) -> Option<TimestampMillis> {
assert!(bucket_duration > 0);
assert!(bucket_duration > 0, "{}", bucket_duration);
self.into()
.checked_div_euclid(bucket_duration)
.and_then(|val| val.checked_mul(bucket_duration))