From 0a22375ac15bc28d0053ce99607a9008da89c14f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 20 Mar 2023 21:53:01 +0800 Subject: [PATCH] fix: nyc-taxi bench suite (#1204) Signed-off-by: Ruihang Xia --- benchmarks/src/bin/nyc-taxi.rs | 72 ++++++++++++++++++++----------- src/common/grpc-expr/src/error.rs | 2 +- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index bb8be2a2b0..adf8fb6fb4 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -21,12 +21,12 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::time::Instant; -use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray}; +use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampMicrosecondArray}; use arrow::datatypes::{DataType, Float64Type, Int64Type}; use arrow::record_batch::RecordBatch; use clap::Parser; use client::api::v1::column::Values; -use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId}; +use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest}; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -61,7 +61,7 @@ struct Args { #[arg(long = "skip-read")] skip_read: bool, - #[arg(short, long, default_value_t = String::from("127.0.0.1:3001"))] + #[arg(short, long, default_value_t = String::from("127.0.0.1:4001"))] endpoint: String, } @@ -97,6 +97,9 @@ async fn write_data( for record_batch in record_batch_reader { let record_batch = record_batch.unwrap(); + if !is_record_batch_full(&record_batch) { + continue; + } let (columns, row_count) = convert_record_batch(record_batch); let request = InsertRequest { table_name: TABLE_NAME.to_string(), @@ -122,11 +125,16 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { let mut columns = vec![]; for (array, field) in record_batch.columns().iter().zip(fields.iter()) { - let values = build_values(array); + let (values, datatype) = build_values(array); let column = Column { column_name: field.name().to_owned(), values: Some(values), - null_mask: vec![], + null_mask: array + .data() + .null_bitmap() + .map(|bitmap| bitmap.buffer().as_slice().to_vec()) + .unwrap_or_default(), + datatype: datatype.into(), // datatype and semantic_type are set to default ..Default::default() }; @@ -136,7 +144,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { (columns, row_count as _) } -fn build_values(column: &ArrayRef) -> Values { +fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) { match column.data_type() { DataType::Int64 => { let array = column @@ -144,10 +152,13 @@ fn build_values(column: &ArrayRef) -> Values { .downcast_ref::>() .unwrap(); let values = array.values(); - Values { - i64_values: values.to_vec(), - ..Default::default() - } + ( + Values { + i64_values: values.to_vec(), + ..Default::default() + }, + ColumnDataType::Int64, + ) } DataType::Float64 => { let array = column @@ -155,29 +166,38 @@ fn build_values(column: &ArrayRef) -> Values { .downcast_ref::>() .unwrap(); let values = array.values(); - Values { - f64_values: values.to_vec(), - ..Default::default() - } + ( + Values { + f64_values: values.to_vec(), + ..Default::default() + }, + ColumnDataType::Float64, + ) } DataType::Timestamp(_, _) => { let array = column .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let values = array.values(); - Values { - i64_values: values.to_vec(), - ..Default::default() - } + ( + Values { + i64_values: values.to_vec(), + ..Default::default() + }, + ColumnDataType::Int64, + ) } DataType::Utf8 => { let array = column.as_any().downcast_ref::().unwrap(); let values = array.iter().filter_map(|s| s.map(String::from)).collect(); - Values { - string_values: values, - ..Default::default() - } + ( + Values { + string_values: values, + ..Default::default() + }, + ColumnDataType::String, + ) } DataType::Null | DataType::Boolean @@ -213,6 +233,10 @@ fn build_values(column: &ArrayRef) -> Values { } } +fn is_record_batch_full(batch: &RecordBatch) -> bool { + batch.columns().iter().all(|col| col.null_count() == 0) +} + fn create_table_expr() -> CreateTableExpr { CreateTableExpr { catalog_name: CATALOG_NAME.to_string(), @@ -340,7 +364,7 @@ fn create_table_expr() -> CreateTableExpr { create_if_not_exists: false, table_options: Default::default(), region_ids: vec![0], - table_id: Some(TableId { id: 0 }), + table_id: None, } } diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index eec2c9511e..7cbb39bb26 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -32,7 +32,7 @@ pub enum Error { DecodeInsert { source: DecodeError }, #[snafu(display("Illegal insert data"))] - IllegalInsertData, + IllegalInsertData { backtrace: Backtrace }, #[snafu(display("Column datatype error, source: {}", source))] ColumnDataType {