diff --git a/.gitignore b/.gitignore index 304df68683..84cdd03cf0 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ logs/ # cpython's generated python byte code **/__pycache__/ + +# Benchmark dataset +benchmarks/data diff --git a/Cargo.lock b/Cargo.lock index 5ceb3fcfcb..b768070c61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,6 +158,30 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "arrow" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1328dbc6d5d76a08b13df3ac630f61a6a31276d9e9d08eb813e98efa624c2382" +dependencies = [ + "bitflags", + "chrono", + "csv", + "flatbuffers", + "half", + "hex", + "indexmap", + "lazy_static", + "lexical-core", + "multiversion", + "num", + "rand 0.8.5", + "regex", + "serde", + "serde_derive", + "serde_json", +] + [[package]] name = "arrow-format" version = "0.4.0" @@ -461,6 +485,19 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "benchmarks" +version = "0.1.0" +dependencies = [ + "arrow", + "clap 4.0.18", + "client", + "indicatif", + "itertools", + "parquet", + "tokio", +] + [[package]] name = "bigdecimal" version = "0.3.0" @@ -794,8 +831,8 @@ checksum = "86447ad904c7fb335a790c9d7fe3d0d971dc523b8ccd1561a520de9a85302750" dependencies = [ "atty", "bitflags", - "clap_derive", - "clap_lex", + "clap_derive 3.2.18", + "clap_lex 0.2.4", "indexmap", "once_cell", "strsim 0.10.0", @@ -803,6 +840,21 @@ dependencies = [ "textwrap 0.15.1", ] +[[package]] +name = "clap" +version = "4.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335867764ed2de42325fafe6d18b8af74ba97ee0c590fa016f157535b42ab04b" +dependencies = [ + "atty", + "bitflags", + "clap_derive 4.0.18", + "clap_lex 0.3.0", + "once_cell", + "strsim 0.10.0", + "termcolor", +] + [[package]] name = "clap_derive" version = "3.2.18" @@ -816,6 +868,19 @@ dependencies = [ "syn", ] +[[package]] +name = "clap_derive" +version = "4.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16a1b0f6422af32d5da0c58e2703320f379216ee70198241c84173a8c5ac28f3" +dependencies = [ + "heck 0.4.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -825,6 +890,15 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clap_lex" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "client" version = "0.1.0" @@ -1764,6 +1838,17 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499" +[[package]] +name = "flatbuffers" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea97b4fe4b84e2f2765449bcea21cbdb3ee28cecb88afbf38a0c2e1639f5eb5" +dependencies = [ + "bitflags", + "smallvec", + "thiserror", +] + [[package]] name = "flate2" version = "1.0.24" @@ -2291,6 +2376,17 @@ dependencies = [ "serde", ] +[[package]] +name = "indicatif" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfddc9561e8baf264e0e45e197fd7696320026eb10a8180340debc27b18f535b" +dependencies = [ + "console", + "number_prefix", + "unicode-width", +] + [[package]] name = "influxdb_line_protocol" version = "0.1.0" @@ -2311,6 +2407,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" + [[package]] name = "integer-encoding" version = "3.0.4" @@ -3129,6 +3231,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.29.0" @@ -3256,7 +3364,7 @@ dependencies = [ "opentelemetry", "opentelemetry-semantic-conventions", "thiserror", - "thrift", + "thrift 0.15.0", "tokio", ] @@ -3345,6 +3453,37 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "parquet" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53e9c8fc20af9b92d85d42ec86e5217b2eaf1340fbba75c4b4296de764ea7921" +dependencies = [ + "arrow", + "base64", + "brotli", + "byteorder", + "chrono", + "flate2", + "lz4", + "num", + "num-bigint", + "parquet-format", + "rand 0.8.5", + "snap", + "thrift 0.13.0", + "zstd", +] + +[[package]] +name = "parquet-format" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f0c06cdcd5460967c485f9c40a821746f5955ad81990533c7fae95dbd9bc0b5" +dependencies = [ + "thrift 0.13.0", +] + [[package]] name = "parquet-format-async-temp" version = "0.2.0" @@ -3354,7 +3493,7 @@ dependencies = [ "async-trait", "byteorder", "futures", - "integer-encoding", + "integer-encoding 3.0.4", "ordered-float 1.1.1", ] @@ -5454,6 +5593,19 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "thrift" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +dependencies = [ + "byteorder", + "integer-encoding 1.1.7", + "log", + "ordered-float 1.1.1", + "threadpool", +] + [[package]] name = "thrift" version = "0.15.0" @@ -5461,7 +5613,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e" dependencies = [ "byteorder", - "integer-encoding", + "integer-encoding 3.0.4", "log", "ordered-float 1.1.1", "threadpool", diff --git a/Cargo.toml b/Cargo.toml index 1c039e17d0..c84de7f8b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "benchmarks", "src/api", "src/catalog", "src/client", @@ -32,3 +33,6 @@ members = [ "src/table", "src/table-engine", ] + +[profile.release] +debug = true diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml new file mode 100644 index 0000000000..311f8867ac --- /dev/null +++ b/benchmarks/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "benchmarks" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = "10" +clap = { version = "4.0", features = ["derive"] } +client = { path = "../src/client" } +itertools = "0.10.5" +indicatif = "0.17.1" +parquet = { version = "*" } +tokio = { version = "1.21", features = ["full"] } diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs new file mode 100644 index 0000000000..02b5a5a8e1 --- /dev/null +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -0,0 +1,439 @@ +//! Use the taxi trip records from New York City dataset to bench. You can download the dataset from +//! [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). + +#![feature(once_cell)] +#![allow(clippy::print_stdout)] + +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, + time::Instant, +}; + +use arrow::{ + array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray}, + datatypes::{DataType, Float64Type, Int64Type}, + record_batch::RecordBatch, +}; +use clap::Parser; +use client::{ + admin::Admin, + api::v1::{ + codec::InsertBatch, column::Values, insert_expr, Column, ColumnDataType, ColumnDef, + CreateExpr, InsertExpr, + }, + Client, Database, Select, +}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use parquet::{ + arrow::{ArrowReader, ParquetFileArrowReader}, + file::{reader::FileReader, serialized_reader::SerializedFileReader}, +}; +use tokio::task::JoinSet; + +const DATABASE_NAME: &str = "greptime"; +const CATALOG_NAME: &str = "greptime"; +const SCHEMA_NAME: &str = "public"; +const TABLE_NAME: &str = "nyc_taxi"; + +#[derive(Parser)] +#[command(name = "NYC benchmark runner")] +struct Args { + /// Path to the dataset + #[arg(short, long)] + path: Option, + + /// Batch size of insert request. + #[arg(short = 's', long = "batch-size", default_value_t = 4096)] + batch_size: usize, + + /// Number of client threads on write (parallel on file level) + #[arg(short = 't', long = "thread-num", default_value_t = 4)] + thread_num: usize, + + /// Number of query iteration + #[arg(short = 'i', long = "iter-num", default_value_t = 3)] + iter_num: usize, + + #[arg(long = "skip-write")] + skip_write: bool, + + #[arg(long = "skip-read")] + skip_read: bool, + + #[arg(short, long, default_value_t = String::from("127.0.0.1:3001"))] + endpoint: String, +} + +fn get_file_list>(path: P) -> Vec { + std::fs::read_dir(path) + .unwrap() + .map(|dir| dir.unwrap().path().canonicalize().unwrap()) + .collect() +} + +async fn write_data( + batch_size: usize, + db: &Database, + path: PathBuf, + mpb: MultiProgress, + pb_style: ProgressStyle, +) -> u128 { + let file = std::fs::File::open(&path).unwrap(); + let file_reader = Arc::new(SerializedFileReader::new(file).unwrap()); + let row_num = file_reader.metadata().file_metadata().num_rows(); + let record_batch_reader = ParquetFileArrowReader::new(file_reader) + .get_record_reader(batch_size) + .unwrap(); + let progress_bar = mpb.add(ProgressBar::new(row_num as _)); + progress_bar.set_style(pb_style); + progress_bar.set_message(format!("{:?}", path)); + + let mut total_rpc_elapsed_ms = 0; + + for record_batch in record_batch_reader { + let record_batch = record_batch.unwrap(); + let row_count = record_batch.num_rows(); + let insert_batch = convert_record_batch(record_batch).into(); + let insert_expr = InsertExpr { + table_name: TABLE_NAME.to_string(), + expr: Some(insert_expr::Expr::Values(insert_expr::Values { + values: vec![insert_batch], + })), + options: HashMap::default(), + }; + let now = Instant::now(); + db.insert(insert_expr).await.unwrap(); + let elapsed = now.elapsed(); + total_rpc_elapsed_ms += elapsed.as_millis(); + progress_bar.inc(row_count as _); + } + + progress_bar.finish_with_message(format!( + "file {:?} done in {}ms", + path, total_rpc_elapsed_ms + )); + total_rpc_elapsed_ms +} + +fn convert_record_batch(record_batch: RecordBatch) -> InsertBatch { + let schema = record_batch.schema(); + let fields = schema.fields(); + let row_count = record_batch.num_rows(); + let mut columns = vec![]; + + for (array, field) in record_batch.columns().iter().zip(fields.iter()) { + let values = build_values(array); + let column = Column { + column_name: field.name().to_owned(), + values: Some(values), + null_mask: vec![], + // datatype and semantic_type are set to default + ..Default::default() + }; + columns.push(column); + } + + InsertBatch { + columns, + row_count: row_count as _, + } +} + +fn build_values(column: &ArrayRef) -> Values { + match column.data_type() { + DataType::Int64 => { + let array = column + .as_any() + .downcast_ref::>() + .unwrap(); + let values = array.values(); + Values { + i64_values: values.to_vec(), + ..Default::default() + } + } + DataType::Float64 => { + let array = column + .as_any() + .downcast_ref::>() + .unwrap(); + let values = array.values(); + Values { + f64_values: values.to_vec(), + ..Default::default() + } + } + DataType::Timestamp(_, _) => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + let values = array.values(); + Values { + i64_values: values.to_vec(), + ..Default::default() + } + } + DataType::Utf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + let values = array.iter().filter_map(|s| s.map(String::from)).collect(); + Values { + string_values: values, + ..Default::default() + } + } + DataType::Null + | DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float16 + | DataType::Float32 + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) + | DataType::Interval(_) + | DataType::Binary + | DataType::FixedSizeBinary(_) + | DataType::LargeBinary + | DataType::LargeUtf8 + | DataType::List(_) + | DataType::FixedSizeList(_, _) + | DataType::LargeList(_) + | DataType::Struct(_) + | DataType::Union(_, _) + | DataType::Dictionary(_, _) + | DataType::Decimal(_, _) + | DataType::Map(_, _) => todo!(), + } +} + +fn create_table_expr() -> CreateExpr { + CreateExpr { + catalog_name: Some(CATALOG_NAME.to_string()), + schema_name: Some(SCHEMA_NAME.to_string()), + table_name: TABLE_NAME.to_string(), + desc: None, + column_defs: vec![ + ColumnDef { + name: "VendorID".to_string(), + datatype: ColumnDataType::Int64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "tpep_pickup_datetime".to_string(), + datatype: ColumnDataType::Int64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "tpep_dropoff_datetime".to_string(), + datatype: ColumnDataType::Int64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "passenger_count".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "trip_distance".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "RatecodeID".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "store_and_fwd_flag".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "PULocationID".to_string(), + datatype: ColumnDataType::Int64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "DOLocationID".to_string(), + datatype: ColumnDataType::Int64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "payment_type".to_string(), + datatype: ColumnDataType::Int64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "fare_amount".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "extra".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "mta_tax".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "tip_amount".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "tolls_amount".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "improvement_surcharge".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "total_amount".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "congestion_surcharge".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "airport_fee".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ], + time_index: "tpep_pickup_datetime".to_string(), + primary_keys: vec!["VendorID".to_string()], + create_if_not_exists: false, + table_options: Default::default(), + } +} + +fn query_set() -> HashMap { + let mut ret = HashMap::new(); + + ret.insert( + "count_all".to_string(), + format!("SELECT COUNT(*) FROM {};", TABLE_NAME), + ); + + ret.insert( + "fare_amt_by_passenger".to_string(), + format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {} GROUP BY passenger_count",TABLE_NAME) + ); + + ret +} + +async fn do_write(args: &Args, client: &Client) { + let admin = Admin::new("admin", client.clone()); + + let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument")); + let mut write_jobs = JoinSet::new(); + + let create_table_result = admin.create(create_table_expr()).await; + println!("Create table result: {:?}", create_table_result); + + let progress_bar_style = ProgressStyle::with_template( + "[{elapsed_precise}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}", + ) + .unwrap() + .progress_chars("##-"); + let multi_progress_bar = MultiProgress::new(); + let file_progress = multi_progress_bar.add(ProgressBar::new(file_list.len() as _)); + file_progress.inc(0); + + let batch_size = args.batch_size; + for _ in 0..args.thread_num { + if let Some(path) = file_list.pop() { + let db = Database::new(DATABASE_NAME, client.clone()); + let mpb = multi_progress_bar.clone(); + let pb_style = progress_bar_style.clone(); + write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await }); + } + } + while write_jobs.join_next().await.is_some() { + file_progress.inc(1); + if let Some(path) = file_list.pop() { + let db = Database::new(DATABASE_NAME, client.clone()); + let mpb = multi_progress_bar.clone(); + let pb_style = progress_bar_style.clone(); + write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await }); + } + } +} + +async fn do_query(num_iter: usize, db: &Database) { + for (query_name, query) in query_set() { + println!("Running query: {}", query); + for i in 0..num_iter { + let now = Instant::now(); + let _res = db.select(Select::Sql(query.clone())).await.unwrap(); + let elapsed = now.elapsed(); + println!( + "query {}, iteration {}: {}ms", + query_name, + i, + elapsed.as_millis() + ); + } + } +} + +fn main() { + let args = Args::parse(); + + tokio::runtime::Builder::new_multi_thread() + .worker_threads(args.thread_num) + .enable_all() + .build() + .unwrap() + .block_on(async { + let client = Client::with_urls(vec![&args.endpoint]); + + if !args.skip_write { + do_write(&args, &client).await; + } + + if !args.skip_read { + let db = Database::new(DATABASE_NAME, client.clone()); + do_query(args.iter_num, &db).await; + } + }) +} diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 5eea78394b..c3d02a3c6b 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -4,6 +4,8 @@ mod database; mod error; pub mod load_balance; +pub use api; + pub use self::{ client::Client, database::{Database, ObjectResult, Select}, diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index fc13e7daf7..03a3bfcac2 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -2,6 +2,7 @@ name = "cmd" version = "0.1.0" edition = "2021" +default-run = "greptime" [[bin]] name = "greptime" @@ -10,7 +11,9 @@ path = "src/bin/greptime.rs" [dependencies] clap = { version = "3.1", features = ["derive"] } common-error = { path = "../common/error" } -common-telemetry = { path = "../common/telemetry", features = ["deadlock_detection"] } +common-telemetry = { path = "../common/telemetry", features = [ + "deadlock_detection", +] } datanode = { path = "../datanode" } frontend = { path = "../frontend" } futures = "0.3" diff --git a/src/common/time/src/timestamp_millis.rs b/src/common/time/src/timestamp_millis.rs index ac31789303..939bb8bbd1 100644 --- a/src/common/time/src/timestamp_millis.rs +++ b/src/common/time/src/timestamp_millis.rs @@ -74,7 +74,7 @@ pub trait BucketAligned { impl> BucketAligned for T { fn align_by_bucket(self, bucket_duration: i64) -> Option { - assert!(bucket_duration > 0); + assert!(bucket_duration > 0, "{}", bucket_duration); self.into() .checked_div_euclid(bucket_duration) .and_then(|val| val.checked_mul(bucket_duration))