mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
* chore: update datafusion * update sqlness case of time.sql Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: adjust range query partition * fix: hisogram incorrect result Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: ignore filter pushdown temporarily Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: update limit sqlness result Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: histogram with wrong distribution Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: update negative ordinal sqlness case Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * feat: bump df to cd7a00b Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * resolve conflicts * ignore test_range_filter Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix promql exec panic * fix "select count(*)" exec error * re-enable the "test_range_filter" test since the filter push down seems not necessary to be removed * fix: range query schema error * update sqlness results Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * resolve conflicts * update datafusion, again * fix pyo3 compile error, and update some sqlness results * update decimal sqlness cases Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix: promql literal * fix udaf tests * fix filter pushdown sqlness tests * fix?: test_cast * fix: rspy test fail due to datafusion `sin` signature change * rebase main to see if there are any failed tests * debug ci * debug ci * debug ci * enforce input partition Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * debug ci * fix ci * fix ci * debug ci * debug ci * debug ci * fix sqlness * feat: do not return error while creating a filter * chore: remove array from error * chore: replace todo with unimplemented * Update src/flow/clippy.toml Co-authored-by: Yingwen <realevenyag@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: WUJingdi <taylor-lagrange@qq.com> Co-authored-by: discord9 <discord9@163.com> Co-authored-by: evenyag <realevenyag@gmail.com> Co-authored-by: tison <wander4096@gmail.com>
514 lines
18 KiB
Rust
514 lines
18 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//! Use the taxi trip records from New York City dataset to bench. You can download the dataset from
|
|
//! [here](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).
|
|
|
|
#![allow(clippy::print_stdout)]
|
|
|
|
use std::collections::HashMap;
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::Instant;
|
|
|
|
use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampMicrosecondArray};
|
|
use arrow::datatypes::{DataType, Float64Type, Int64Type};
|
|
use arrow::record_batch::RecordBatch;
|
|
use clap::Parser;
|
|
use client::api::v1::column::Values;
|
|
use client::api::v1::{
|
|
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, SemanticType,
|
|
};
|
|
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
|
use futures_util::TryStreamExt;
|
|
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
|
|
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
|
|
use tokio::task::JoinSet;
|
|
|
|
const CATALOG_NAME: &str = "greptime";
|
|
const SCHEMA_NAME: &str = "public";
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "NYC benchmark runner")]
|
|
struct Args {
|
|
/// Path to the dataset
|
|
#[arg(short, long)]
|
|
path: Option<String>,
|
|
|
|
/// Batch size of insert request.
|
|
#[arg(short = 's', long = "batch-size", default_value_t = 4096)]
|
|
batch_size: usize,
|
|
|
|
/// Number of client threads on write (parallel on file level)
|
|
#[arg(short = 't', long = "thread-num", default_value_t = 4)]
|
|
thread_num: usize,
|
|
|
|
/// Number of query iteration
|
|
#[arg(short = 'i', long = "iter-num", default_value_t = 3)]
|
|
iter_num: usize,
|
|
|
|
#[arg(long = "skip-write")]
|
|
skip_write: bool,
|
|
|
|
#[arg(long = "skip-read")]
|
|
skip_read: bool,
|
|
|
|
#[arg(short, long, default_value_t = String::from("127.0.0.1:4001"))]
|
|
endpoint: String,
|
|
}
|
|
|
|
fn get_file_list<P: AsRef<Path>>(path: P) -> Vec<PathBuf> {
|
|
std::fs::read_dir(path)
|
|
.unwrap()
|
|
.map(|dir| dir.unwrap().path().canonicalize().unwrap())
|
|
.collect()
|
|
}
|
|
|
|
fn new_table_name() -> String {
|
|
format!("nyc_taxi_{}", chrono::Utc::now().timestamp())
|
|
}
|
|
|
|
async fn write_data(
|
|
table_name: &str,
|
|
batch_size: usize,
|
|
db: &Database,
|
|
path: PathBuf,
|
|
mpb: MultiProgress,
|
|
pb_style: ProgressStyle,
|
|
) -> u128 {
|
|
let file = std::fs::File::open(&path).unwrap();
|
|
let record_batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
|
|
let row_num = record_batch_reader_builder
|
|
.metadata()
|
|
.file_metadata()
|
|
.num_rows();
|
|
let record_batch_reader = record_batch_reader_builder
|
|
.with_batch_size(batch_size)
|
|
.build()
|
|
.unwrap();
|
|
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
|
|
progress_bar.set_style(pb_style);
|
|
progress_bar.set_message(format!("{path:?}"));
|
|
|
|
let mut total_rpc_elapsed_ms = 0;
|
|
|
|
for record_batch in record_batch_reader {
|
|
let record_batch = record_batch.unwrap();
|
|
if !is_record_batch_full(&record_batch) {
|
|
continue;
|
|
}
|
|
let (columns, row_count) = convert_record_batch(record_batch);
|
|
let request = InsertRequest {
|
|
table_name: table_name.to_string(),
|
|
columns,
|
|
row_count,
|
|
};
|
|
let requests = InsertRequests {
|
|
inserts: vec![request],
|
|
};
|
|
|
|
let now = Instant::now();
|
|
db.insert(requests).await.unwrap();
|
|
let elapsed = now.elapsed();
|
|
total_rpc_elapsed_ms += elapsed.as_millis();
|
|
progress_bar.inc(row_count as _);
|
|
}
|
|
|
|
progress_bar.finish_with_message(format!("file {path:?} done in {total_rpc_elapsed_ms}ms",));
|
|
total_rpc_elapsed_ms
|
|
}
|
|
|
|
fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {
|
|
let schema = record_batch.schema();
|
|
let fields = schema.fields();
|
|
let row_count = record_batch.num_rows();
|
|
let mut columns = vec![];
|
|
|
|
for (array, field) in record_batch.columns().iter().zip(fields.iter()) {
|
|
let (values, datatype) = build_values(array);
|
|
let semantic_type = match field.name().as_str() {
|
|
"VendorID" => SemanticType::Tag,
|
|
"tpep_pickup_datetime" => SemanticType::Timestamp,
|
|
_ => SemanticType::Field,
|
|
};
|
|
|
|
let column = Column {
|
|
column_name: field.name().clone(),
|
|
values: Some(values),
|
|
null_mask: array
|
|
.to_data()
|
|
.nulls()
|
|
.map(|bitmap| bitmap.buffer().as_slice().to_vec())
|
|
.unwrap_or_default(),
|
|
datatype: datatype.into(),
|
|
semantic_type: semantic_type as i32,
|
|
..Default::default()
|
|
};
|
|
columns.push(column);
|
|
}
|
|
|
|
(columns, row_count as _)
|
|
}
|
|
|
|
fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
|
|
match column.data_type() {
|
|
DataType::Int64 => {
|
|
let array = column
|
|
.as_any()
|
|
.downcast_ref::<PrimitiveArray<Int64Type>>()
|
|
.unwrap();
|
|
let values = array.values();
|
|
(
|
|
Values {
|
|
i64_values: values.to_vec(),
|
|
..Default::default()
|
|
},
|
|
ColumnDataType::Int64,
|
|
)
|
|
}
|
|
DataType::Float64 => {
|
|
let array = column
|
|
.as_any()
|
|
.downcast_ref::<PrimitiveArray<Float64Type>>()
|
|
.unwrap();
|
|
let values = array.values();
|
|
(
|
|
Values {
|
|
f64_values: values.to_vec(),
|
|
..Default::default()
|
|
},
|
|
ColumnDataType::Float64,
|
|
)
|
|
}
|
|
DataType::Timestamp(_, _) => {
|
|
let array = column
|
|
.as_any()
|
|
.downcast_ref::<TimestampMicrosecondArray>()
|
|
.unwrap();
|
|
let values = array.values();
|
|
(
|
|
Values {
|
|
timestamp_microsecond_values: values.to_vec(),
|
|
..Default::default()
|
|
},
|
|
ColumnDataType::TimestampMicrosecond,
|
|
)
|
|
}
|
|
DataType::Utf8 => {
|
|
let array = column.as_any().downcast_ref::<StringArray>().unwrap();
|
|
let values = array.iter().filter_map(|s| s.map(String::from)).collect();
|
|
(
|
|
Values {
|
|
string_values: values,
|
|
..Default::default()
|
|
},
|
|
ColumnDataType::String,
|
|
)
|
|
}
|
|
_ => unimplemented!(),
|
|
}
|
|
}
|
|
|
|
fn is_record_batch_full(batch: &RecordBatch) -> bool {
|
|
batch.columns().iter().all(|col| col.null_count() == 0)
|
|
}
|
|
|
|
fn create_table_expr(table_name: &str) -> CreateTableExpr {
|
|
CreateTableExpr {
|
|
catalog_name: CATALOG_NAME.to_string(),
|
|
schema_name: SCHEMA_NAME.to_string(),
|
|
table_name: table_name.to_string(),
|
|
desc: String::default(),
|
|
column_defs: vec![
|
|
ColumnDef {
|
|
name: "VendorID".to_string(),
|
|
data_type: ColumnDataType::Int64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Tag as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "tpep_pickup_datetime".to_string(),
|
|
data_type: ColumnDataType::TimestampMicrosecond as i32,
|
|
is_nullable: false,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Timestamp as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "tpep_dropoff_datetime".to_string(),
|
|
data_type: ColumnDataType::TimestampMicrosecond as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "passenger_count".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "trip_distance".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "RatecodeID".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "store_and_fwd_flag".to_string(),
|
|
data_type: ColumnDataType::String as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "PULocationID".to_string(),
|
|
data_type: ColumnDataType::Int64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "DOLocationID".to_string(),
|
|
data_type: ColumnDataType::Int64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "payment_type".to_string(),
|
|
data_type: ColumnDataType::Int64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "fare_amount".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "extra".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "mta_tax".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "tip_amount".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "tolls_amount".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "improvement_surcharge".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "total_amount".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "congestion_surcharge".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
ColumnDef {
|
|
name: "airport_fee".to_string(),
|
|
data_type: ColumnDataType::Float64 as i32,
|
|
is_nullable: true,
|
|
default_constraint: vec![],
|
|
semantic_type: SemanticType::Field as i32,
|
|
comment: String::new(),
|
|
..Default::default()
|
|
},
|
|
],
|
|
time_index: "tpep_pickup_datetime".to_string(),
|
|
primary_keys: vec!["VendorID".to_string()],
|
|
create_if_not_exists: true,
|
|
table_options: Default::default(),
|
|
table_id: None,
|
|
engine: "mito".to_string(),
|
|
}
|
|
}
|
|
|
|
fn query_set(table_name: &str) -> HashMap<String, String> {
|
|
HashMap::from([
|
|
(
|
|
"count_all".to_string(),
|
|
format!("SELECT COUNT(*) FROM {table_name};"),
|
|
),
|
|
(
|
|
"fare_amt_by_passenger".to_string(),
|
|
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {table_name} GROUP BY passenger_count"),
|
|
)
|
|
])
|
|
}
|
|
|
|
async fn do_write(args: &Args, db: &Database, table_name: &str) {
|
|
let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument"));
|
|
let mut write_jobs = JoinSet::new();
|
|
|
|
let create_table_result = db.create(create_table_expr(table_name)).await;
|
|
println!("Create table result: {create_table_result:?}");
|
|
|
|
let progress_bar_style = ProgressStyle::with_template(
|
|
"[{elapsed_precise}] {bar:60.cyan/blue} {pos:>7}/{len:7} {msg}",
|
|
)
|
|
.unwrap()
|
|
.progress_chars("##-");
|
|
let multi_progress_bar = MultiProgress::new();
|
|
let file_progress = multi_progress_bar.add(ProgressBar::new(file_list.len() as _));
|
|
file_progress.inc(0);
|
|
|
|
let batch_size = args.batch_size;
|
|
for _ in 0..args.thread_num {
|
|
if let Some(path) = file_list.pop() {
|
|
let db = db.clone();
|
|
let mpb = multi_progress_bar.clone();
|
|
let pb_style = progress_bar_style.clone();
|
|
let table_name = table_name.to_string();
|
|
let _ = write_jobs.spawn(async move {
|
|
write_data(&table_name, batch_size, &db, path, mpb, pb_style).await
|
|
});
|
|
}
|
|
}
|
|
while write_jobs.join_next().await.is_some() {
|
|
file_progress.inc(1);
|
|
if let Some(path) = file_list.pop() {
|
|
let db = db.clone();
|
|
let mpb = multi_progress_bar.clone();
|
|
let pb_style = progress_bar_style.clone();
|
|
let table_name = table_name.to_string();
|
|
let _ = write_jobs.spawn(async move {
|
|
write_data(&table_name, batch_size, &db, path, mpb, pb_style).await
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
|
|
for (query_name, query) in query_set(table_name) {
|
|
println!("Running query: {query}");
|
|
for i in 0..num_iter {
|
|
let now = Instant::now();
|
|
let res = db.sql(&query).await.unwrap();
|
|
match res.data {
|
|
OutputData::AffectedRows(_) | OutputData::RecordBatches(_) => (),
|
|
OutputData::Stream(stream) => {
|
|
stream.try_collect::<Vec<_>>().await.unwrap();
|
|
}
|
|
}
|
|
let elapsed = now.elapsed();
|
|
println!(
|
|
"query {}, iteration {}: {}ms",
|
|
query_name,
|
|
i,
|
|
elapsed.as_millis(),
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn main() {
|
|
let args = Args::parse();
|
|
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.worker_threads(args.thread_num)
|
|
.enable_all()
|
|
.build()
|
|
.unwrap()
|
|
.block_on(async {
|
|
let client = Client::with_urls(vec![&args.endpoint]);
|
|
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
|
let table_name = new_table_name();
|
|
|
|
if !args.skip_write {
|
|
do_write(&args, &db, &table_name).await;
|
|
}
|
|
|
|
if !args.skip_read {
|
|
do_query(args.iter_num, &db, &table_name).await;
|
|
}
|
|
})
|
|
}
|