diff --git a/Cargo.lock b/Cargo.lock index b85c873d76..94ce5f4ad5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2478,6 +2478,7 @@ dependencies = [ "common-macro", "common-telemetry", "common-time", + "criterion 0.7.0", "datafusion", "datafusion-common", "datatypes", @@ -11557,6 +11558,7 @@ dependencies = [ "client", "common-base", "common-catalog", + "common-decimal", "common-error", "common-frontend", "common-grpc", diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index 70db559e50..a8cf6a5a7d 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -27,4 +27,9 @@ snafu.workspace = true tokio.workspace = true [dev-dependencies] +criterion = "0.7.0" tokio.workspace = true + +[[bench]] +name = "iter_record_batch_rows" +harness = false diff --git a/src/common/recordbatch/benches/iter_record_batch_rows.rs b/src/common/recordbatch/benches/iter_record_batch_rows.rs new file mode 100644 index 0000000000..b819a4658e --- /dev/null +++ b/src/common/recordbatch/benches/iter_record_batch_rows.rs @@ -0,0 +1,179 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, TimeUnit}; +use datafusion_common::arrow::array::{ArrayRef, RecordBatch, StringArray}; +use datafusion_common::arrow::datatypes::Schema; +use datafusion_common::{ScalarValue, utils}; +use datatypes::arrow::array::AsArray; +use datatypes::arrow::datatypes::{ + Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, +}; +use datatypes::schema::SchemaRef; + +fn prepare_record_batch(rows: usize) -> RecordBatch { + let schema = Schema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("i", DataType::Int32, true), + Field::new("s", DataType::Utf8, true), + ]); + + let columns: Vec = vec![ + Arc::new(TimestampMillisecondArray::from_iter_values( + (0..rows).map(|x| (1760313600000 + x) as i64), + )), + Arc::new(Int32Array::from_iter_values((0..rows).map(|x| x as i32))), + Arc::new(StringArray::from_iter((0..rows).map(|x| { + if x % 2 == 0 { + Some(format!("s_{x}")) + } else { + None + } + }))), + ]; + + RecordBatch::try_new(Arc::new(schema), columns).unwrap() +} + +fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) { + let record_batch = + common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap(); + for row in record_batch.rows() { + black_box(row); + } +} + +fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) { + for i in 0..record_batch.num_rows() { + for column in record_batch.columns() { + match column.data_type() { + DataType::Timestamp(time_unit, _) => { + let v = match time_unit { + TimeUnit::Second => { + let array = column.as_primitive::(); + array.value(i) + } + TimeUnit::Millisecond => { + let array = column.as_primitive::(); + array.value(i) + } + TimeUnit::Microsecond => { + let array = column.as_primitive::(); + array.value(i) + } + TimeUnit::Nanosecond => { + let array = column.as_primitive::(); + array.value(i) + } + }; + black_box(v); + } + DataType::Int32 => { + let array = column.as_primitive::(); + let v = array.value(i); + black_box(v); + } + DataType::Utf8 => { + let array = column.as_string::(); + let v = array.value(i); + black_box(v); + } + _ => unreachable!(), + } + } + } +} + +fn iter_by_datafusion_scalar_values(record_batch: RecordBatch) { + let columns = record_batch.columns(); + for i in 0..record_batch.num_rows() { + let row = utils::get_row_at_idx(columns, i).unwrap(); + black_box(row); + } +} + +fn iter_by_datafusion_scalar_values_with_buf(record_batch: RecordBatch) { + let columns = record_batch.columns(); + let mut buf = vec![ScalarValue::Null; columns.len()]; + for i in 0..record_batch.num_rows() { + utils::extract_row_at_idx_to_buf(columns, i, &mut buf).unwrap(); + } +} + +pub fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("iter_record_batch"); + + for rows in [1usize, 10, 100, 1_000, 10_000] { + group.bench_with_input( + BenchmarkId::new("by_greptimedb_values", rows), + &rows, + |b, rows| { + let record_batch = prepare_record_batch(*rows); + let schema = + Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap()); + b.iter(|| { + iter_by_greptimedb_values(schema.clone(), record_batch.clone()); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("by_loop_rows_and_columns", rows), + &rows, + |b, rows| { + let record_batch = prepare_record_batch(*rows); + b.iter(|| { + iter_by_loop_rows_and_columns(record_batch.clone()); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("by_datafusion_scalar_values", rows), + &rows, + |b, rows| { + let record_batch = prepare_record_batch(*rows); + b.iter(|| { + iter_by_datafusion_scalar_values(record_batch.clone()); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("by_datafusion_scalar_values_with_buf", rows), + &rows, + |b, rows| { + let record_batch = prepare_record_batch(*rows); + b.iter(|| { + iter_by_datafusion_scalar_values_with_buf(record_batch.clone()); + }) + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 6fb0b1f1d3..6442a3533b 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -35,6 +35,7 @@ catalog.workspace = true chrono.workspace = true common-base.workspace = true common-catalog.workspace = true +common-decimal.workspace = true common-error.workspace = true common-frontend.workspace = true common-grpc.workspace = true diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 58cd3900a2..2b8495074a 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -12,18 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Deref; +use std::time::Duration; +use arrow::array::{Array, AsArray}; +use arrow::datatypes::{ + Date32Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, + DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, + Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_schema::{DataType, IntervalUnit, TimeUnit}; +use common_decimal::Decimal128; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::{debug, error}; -use datatypes::prelude::{ConcreteDataType, Value}; +use common_time::time::Time; +use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp}; +use datafusion_common::ScalarValue; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; use datatypes::types::jsonb_to_string; use futures::StreamExt; -use itertools::Itertools; use opensrv_mysql::{ Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter, }; @@ -31,7 +44,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use tokio::io::AsyncWrite; -use crate::error::{self, ConvertSqlValueSnafu, Result, ToJsonSnafu}; +use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result}; use crate::metrics::*; /// Try to write multiple output to the writer if possible. @@ -168,7 +181,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Ok(record_batch) => { Self::write_recordbatch( &mut row_writer, - &record_batch, + record_batch, query_context.clone(), &query_result.schema, ) @@ -192,69 +205,228 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { async fn write_recordbatch( row_writer: &mut RowWriter<'_, W>, - recordbatch: &RecordBatch, + record_batch: RecordBatch, query_context: QueryContextRef, schema: &SchemaRef, ) -> Result<()> { - for row in recordbatch.rows() { - for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) { - match value { - Value::Null => row_writer.write_col(None::)?, - Value::Boolean(v) => row_writer.write_col(v as i8)?, - Value::UInt8(v) => row_writer.write_col(v)?, - Value::UInt16(v) => row_writer.write_col(v)?, - Value::UInt32(v) => row_writer.write_col(v)?, - Value::UInt64(v) => row_writer.write_col(v)?, - Value::Int8(v) => row_writer.write_col(v)?, - Value::Int16(v) => row_writer.write_col(v)?, - Value::Int32(v) => row_writer.write_col(v)?, - Value::Int64(v) => row_writer.write_col(v)?, - Value::Float32(v) => row_writer.write_col(v.0)?, - Value::Float64(v) => row_writer.write_col(v.0)?, - Value::String(v) => row_writer.write_col(v.as_utf8())?, - Value::Binary(v) => match &column.data_type { - ConcreteDataType::Json(_j) => { - let s = jsonb_to_string(&v).context(ConvertSqlValueSnafu)?; + let record_batch = record_batch.into_df_record_batch(); + for i in 0..record_batch.num_rows() { + for (j, column) in record_batch.columns().iter().enumerate() { + if column.is_null(i) { + row_writer.write_col(None::)?; + continue; + } + + match column.data_type() { + DataType::Null => { + row_writer.write_col(None::)?; + } + DataType::Boolean => { + let array = column.as_boolean(); + row_writer.write_col(array.value(i) as i8)?; + } + DataType::UInt8 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::UInt16 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::UInt32 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::UInt64 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Int8 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Int16 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Int32 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Int64 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Float32 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Float64 => { + let array = column.as_primitive::(); + row_writer.write_col(array.value(i))?; + } + DataType::Utf8 => { + let array = column.as_string::(); + row_writer.write_col(array.value(i))?; + } + DataType::Utf8View => { + let array = column.as_string_view(); + row_writer.write_col(array.value(i))?; + } + DataType::LargeUtf8 => { + let array = column.as_string::(); + row_writer.write_col(array.value(i))?; + } + DataType::Binary => { + let array = column.as_binary::(); + let v = array.value(i); + if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type { + let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?; row_writer.write_col(s)?; + } else { + row_writer.write_col(v)?; } - _ => { - row_writer.write_col(v.deref())?; + } + DataType::BinaryView => { + let array = column.as_binary_view(); + let v = array.value(i); + if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type { + let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?; + row_writer.write_col(s)?; + } else { + row_writer.write_col(v)?; + } + } + DataType::LargeBinary => { + let array = column.as_binary::(); + let v = array.value(i); + if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type { + let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?; + row_writer.write_col(s)?; + } else { + row_writer.write_col(v)?; + } + } + DataType::Date32 => { + let array = column.as_primitive::(); + let v = Date::new(array.value(i)); + row_writer.write_col(v.to_chrono_date())?; + } + DataType::Timestamp(time_unit, _) => { + let v = match time_unit { + TimeUnit::Second => { + let array = column.as_primitive::(); + array.value(i) + } + TimeUnit::Millisecond => { + let array = column.as_primitive::(); + array.value(i) + } + TimeUnit::Microsecond => { + let array = column.as_primitive::(); + array.value(i) + } + TimeUnit::Nanosecond => { + let array = column.as_primitive::(); + array.value(i) + } + }; + let v = Timestamp::new(v, time_unit.into()); + let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())); + row_writer.write_col(v)?; + } + DataType::Interval(interval_unit) => match interval_unit { + IntervalUnit::YearMonth => { + let array = column.as_primitive::(); + let v: IntervalYearMonth = array.value(i).into(); + row_writer.write_col(v.to_iso8601_string())?; + } + IntervalUnit::DayTime => { + let array = column.as_primitive::(); + let v: IntervalDayTime = array.value(i).into(); + row_writer.write_col(v.to_iso8601_string())?; + } + IntervalUnit::MonthDayNano => { + let array = column.as_primitive::(); + let v: IntervalMonthDayNano = array.value(i).into(); + row_writer.write_col(v.to_iso8601_string())?; } }, - Value::Date(v) => row_writer.write_col(v.to_chrono_date())?, - // convert timestamp to timezone of current connection - Value::Timestamp(v) => row_writer.write_col( - v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())), - )?, - Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?, - Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?, - Value::IntervalMonthDayNano(v) => { - row_writer.write_col(v.to_iso8601_string())? + DataType::Duration(time_unit) => match time_unit { + TimeUnit::Second => { + let array = column.as_primitive::(); + let v = array.value(i); + row_writer.write_col(Duration::from_secs(v as u64))?; + } + TimeUnit::Millisecond => { + let array = column.as_primitive::(); + let v = array.value(i); + row_writer.write_col(Duration::from_millis(v as u64))?; + } + TimeUnit::Microsecond => { + let array = column.as_primitive::(); + let v = array.value(i); + row_writer.write_col(Duration::from_micros(v as u64))?; + } + TimeUnit::Nanosecond => { + let array = column.as_primitive::(); + let v = array.value(i); + row_writer.write_col(Duration::from_nanos(v as u64))?; + } + }, + DataType::List(_) => { + let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?; + row_writer.write_col(v.to_string())?; } - Value::Duration(v) => row_writer.write_col(v.to_std_duration())?, - Value::List(v) => row_writer.write_col(format!( - "[{}]", - v.items().iter().map(|x| x.to_string()).join(", ") - ))?, - Value::Struct(struct_value) => row_writer.write_col(format!( - "{{{}}}", - struct_value - .struct_type() - .fields() - .iter() - .map(|f| f.name()) - .zip(struct_value.items().iter()) - .map(|(k, v)| format!("{k}: {v}")) - .join(", ") - ))?, - Value::Json(inner) => { - let json_value = - serde_json::Value::try_from(*inner).context(ToJsonSnafu)?; - row_writer.write_col(json_value.to_string())? + DataType::Struct(_) => { + let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?; + row_writer.write_col(v.to_string())?; + } + DataType::Time32(time_unit) => { + let time = match time_unit { + TimeUnit::Second => { + let array = column.as_primitive::(); + Time::new_second(array.value(i) as i64) + } + TimeUnit::Millisecond => { + let array = column.as_primitive::(); + Time::new_millisecond(array.value(i) as i64) + } + _ => unreachable!( + "`DataType::Time32` has only second and millisecond time units" + ), + }; + let v = time.to_timezone_aware_string(Some(&query_context.timezone())); + row_writer.write_col(v)?; + } + DataType::Time64(time_unit) => { + let time = match time_unit { + TimeUnit::Microsecond => { + let array = column.as_primitive::(); + Time::new_microsecond(array.value(i)) + } + TimeUnit::Nanosecond => { + let array = column.as_primitive::(); + Time::new_nanosecond(array.value(i)) + } + _ => unreachable!( + "`DataType::Time64` has only microsecond and nanosecond time units" + ), + }; + let v = time.to_timezone_aware_string(Some(&query_context.timezone())); + row_writer.write_col(v)?; + } + DataType::Decimal128(precision, scale) => { + let array = column.as_primitive::(); + let v = Decimal128::new(array.value(i), *precision, *scale); + row_writer.write_col(v.to_string())?; + } + _ => { + return NotSupportedSnafu { + feat: format!("convert {} to MySQL value", column.data_type()), + } + .fail(); } - Value::Time(v) => row_writer - .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?, - Value::Decimal128(v) => row_writer.write_col(v.to_string())?, } } row_writer.end_row().await?;