refactor: convert to mysql values directly from arrow (#7096)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-10-20 19:09:24 +08:00
committed by GitHub
parent 3119464ff9
commit 855eb54ded
5 changed files with 418 additions and 59 deletions

2
Cargo.lock generated
View File

@@ -2478,6 +2478,7 @@ dependencies = [
"common-macro",
"common-telemetry",
"common-time",
"criterion 0.7.0",
"datafusion",
"datafusion-common",
"datatypes",
@@ -11557,6 +11558,7 @@ dependencies = [
"client",
"common-base",
"common-catalog",
"common-decimal",
"common-error",
"common-frontend",
"common-grpc",

View File

@@ -27,4 +27,9 @@ snafu.workspace = true
tokio.workspace = true
[dev-dependencies]
criterion = "0.7.0"
tokio.workspace = true
[[bench]]
name = "iter_record_batch_rows"
harness = false

View File

@@ -0,0 +1,179 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hint::black_box;
use std::sync::Arc;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion_common::arrow::array::{ArrayRef, RecordBatch, StringArray};
use datafusion_common::arrow::datatypes::Schema;
use datafusion_common::{ScalarValue, utils};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{
Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::schema::SchemaRef;
fn prepare_record_batch(rows: usize) -> RecordBatch {
let schema = Schema::new(vec![
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("i", DataType::Int32, true),
Field::new("s", DataType::Utf8, true),
]);
let columns: Vec<ArrayRef> = vec![
Arc::new(TimestampMillisecondArray::from_iter_values(
(0..rows).map(|x| (1760313600000 + x) as i64),
)),
Arc::new(Int32Array::from_iter_values((0..rows).map(|x| x as i32))),
Arc::new(StringArray::from_iter((0..rows).map(|x| {
if x % 2 == 0 {
Some(format!("s_{x}"))
} else {
None
}
}))),
];
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}
fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) {
let record_batch =
common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap();
for row in record_batch.rows() {
black_box(row);
}
}
fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) {
for i in 0..record_batch.num_rows() {
for column in record_batch.columns() {
match column.data_type() {
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
black_box(v);
}
DataType::Int32 => {
let array = column.as_primitive::<Int32Type>();
let v = array.value(i);
black_box(v);
}
DataType::Utf8 => {
let array = column.as_string::<i32>();
let v = array.value(i);
black_box(v);
}
_ => unreachable!(),
}
}
}
}
fn iter_by_datafusion_scalar_values(record_batch: RecordBatch) {
let columns = record_batch.columns();
for i in 0..record_batch.num_rows() {
let row = utils::get_row_at_idx(columns, i).unwrap();
black_box(row);
}
}
fn iter_by_datafusion_scalar_values_with_buf(record_batch: RecordBatch) {
let columns = record_batch.columns();
let mut buf = vec![ScalarValue::Null; columns.len()];
for i in 0..record_batch.num_rows() {
utils::extract_row_at_idx_to_buf(columns, i, &mut buf).unwrap();
}
}
pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("iter_record_batch");
for rows in [1usize, 10, 100, 1_000, 10_000] {
group.bench_with_input(
BenchmarkId::new("by_greptimedb_values", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
let schema =
Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap());
b.iter(|| {
iter_by_greptimedb_values(schema.clone(), record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_loop_rows_and_columns", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
b.iter(|| {
iter_by_loop_rows_and_columns(record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_datafusion_scalar_values", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
b.iter(|| {
iter_by_datafusion_scalar_values(record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_datafusion_scalar_values_with_buf", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
b.iter(|| {
iter_by_datafusion_scalar_values_with_buf(record_batch.clone());
})
},
);
}
group.finish();
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -35,6 +35,7 @@ catalog.workspace = true
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-grpc.workspace = true

View File

@@ -12,18 +12,31 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use std::time::Duration;
use arrow::array::{Array, AsArray};
use arrow::datatypes::{
Date32Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use common_decimal::Decimal128;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use datatypes::prelude::{ConcreteDataType, Value};
use common_time::time::Time;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::types::jsonb_to_string;
use futures::StreamExt;
use itertools::Itertools;
use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
};
@@ -31,7 +44,7 @@ use session::context::QueryContextRef;
use snafu::prelude::*;
use tokio::io::AsyncWrite;
use crate::error::{self, ConvertSqlValueSnafu, Result, ToJsonSnafu};
use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
use crate::metrics::*;
/// Try to write multiple output to the writer if possible.
@@ -168,7 +181,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Ok(record_batch) => {
Self::write_recordbatch(
&mut row_writer,
&record_batch,
record_batch,
query_context.clone(),
&query_result.schema,
)
@@ -192,69 +205,228 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
async fn write_recordbatch(
row_writer: &mut RowWriter<'_, W>,
recordbatch: &RecordBatch,
record_batch: RecordBatch,
query_context: QueryContextRef,
schema: &SchemaRef,
) -> Result<()> {
for row in recordbatch.rows() {
for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
match value {
Value::Null => row_writer.write_col(None::<u8>)?,
Value::Boolean(v) => row_writer.write_col(v as i8)?,
Value::UInt8(v) => row_writer.write_col(v)?,
Value::UInt16(v) => row_writer.write_col(v)?,
Value::UInt32(v) => row_writer.write_col(v)?,
Value::UInt64(v) => row_writer.write_col(v)?,
Value::Int8(v) => row_writer.write_col(v)?,
Value::Int16(v) => row_writer.write_col(v)?,
Value::Int32(v) => row_writer.write_col(v)?,
Value::Int64(v) => row_writer.write_col(v)?,
Value::Float32(v) => row_writer.write_col(v.0)?,
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => match &column.data_type {
ConcreteDataType::Json(_j) => {
let s = jsonb_to_string(&v).context(ConvertSqlValueSnafu)?;
let record_batch = record_batch.into_df_record_batch();
for i in 0..record_batch.num_rows() {
for (j, column) in record_batch.columns().iter().enumerate() {
if column.is_null(i) {
row_writer.write_col(None::<u8>)?;
continue;
}
match column.data_type() {
DataType::Null => {
row_writer.write_col(None::<u8>)?;
}
DataType::Boolean => {
let array = column.as_boolean();
row_writer.write_col(array.value(i) as i8)?;
}
DataType::UInt8 => {
let array = column.as_primitive::<UInt8Type>();
row_writer.write_col(array.value(i))?;
}
DataType::UInt16 => {
let array = column.as_primitive::<UInt16Type>();
row_writer.write_col(array.value(i))?;
}
DataType::UInt32 => {
let array = column.as_primitive::<UInt32Type>();
row_writer.write_col(array.value(i))?;
}
DataType::UInt64 => {
let array = column.as_primitive::<UInt64Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Int8 => {
let array = column.as_primitive::<Int8Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Int16 => {
let array = column.as_primitive::<Int16Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Int32 => {
let array = column.as_primitive::<Int32Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Int64 => {
let array = column.as_primitive::<Int64Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Float32 => {
let array = column.as_primitive::<Float32Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Float64 => {
let array = column.as_primitive::<Float64Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Utf8 => {
let array = column.as_string::<i32>();
row_writer.write_col(array.value(i))?;
}
DataType::Utf8View => {
let array = column.as_string_view();
row_writer.write_col(array.value(i))?;
}
DataType::LargeUtf8 => {
let array = column.as_string::<i64>();
row_writer.write_col(array.value(i))?;
}
DataType::Binary => {
let array = column.as_binary::<i32>();
let v = array.value(i);
if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
} else {
row_writer.write_col(v)?;
}
_ => {
row_writer.write_col(v.deref())?;
}
DataType::BinaryView => {
let array = column.as_binary_view();
let v = array.value(i);
if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
} else {
row_writer.write_col(v)?;
}
}
DataType::LargeBinary => {
let array = column.as_binary::<i64>();
let v = array.value(i);
if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
} else {
row_writer.write_col(v)?;
}
}
DataType::Date32 => {
let array = column.as_primitive::<Date32Type>();
let v = Date::new(array.value(i));
row_writer.write_col(v.to_chrono_date())?;
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let v = Timestamp::new(v, time_unit.into());
let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
row_writer.write_col(v)?;
}
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = column.as_primitive::<IntervalYearMonthType>();
let v: IntervalYearMonth = array.value(i).into();
row_writer.write_col(v.to_iso8601_string())?;
}
IntervalUnit::DayTime => {
let array = column.as_primitive::<IntervalDayTimeType>();
let v: IntervalDayTime = array.value(i).into();
row_writer.write_col(v.to_iso8601_string())?;
}
IntervalUnit::MonthDayNano => {
let array = column.as_primitive::<IntervalMonthDayNanoType>();
let v: IntervalMonthDayNano = array.value(i).into();
row_writer.write_col(v.to_iso8601_string())?;
}
},
Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
// convert timestamp to timezone of current connection
Value::Timestamp(v) => row_writer.write_col(
v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
)?,
Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
Value::IntervalMonthDayNano(v) => {
row_writer.write_col(v.to_iso8601_string())?
DataType::Duration(time_unit) => match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<DurationSecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_secs(v as u64))?;
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<DurationMillisecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_millis(v as u64))?;
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<DurationMicrosecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_micros(v as u64))?;
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<DurationNanosecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_nanos(v as u64))?;
}
},
DataType::List(_) => {
let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
row_writer.write_col(v.to_string())?;
}
Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
Value::List(v) => row_writer.write_col(format!(
"[{}]",
v.items().iter().map(|x| x.to_string()).join(", ")
))?,
Value::Struct(struct_value) => row_writer.write_col(format!(
"{{{}}}",
struct_value
.struct_type()
.fields()
.iter()
.map(|f| f.name())
.zip(struct_value.items().iter())
.map(|(k, v)| format!("{k}: {v}"))
.join(", ")
))?,
Value::Json(inner) => {
let json_value =
serde_json::Value::try_from(*inner).context(ToJsonSnafu)?;
row_writer.write_col(json_value.to_string())?
DataType::Struct(_) => {
let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
row_writer.write_col(v.to_string())?;
}
DataType::Time32(time_unit) => {
let time = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
_ => unreachable!(
"`DataType::Time32` has only second and millisecond time units"
),
};
let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
row_writer.write_col(v)?;
}
DataType::Time64(time_unit) => {
let time = match time_unit {
TimeUnit::Microsecond => {
let array = column.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
_ => unreachable!(
"`DataType::Time64` has only microsecond and nanosecond time units"
),
};
let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
row_writer.write_col(v)?;
}
DataType::Decimal128(precision, scale) => {
let array = column.as_primitive::<Decimal128Type>();
let v = Decimal128::new(array.value(i), *precision, *scale);
row_writer.write_col(v.to_string())?;
}
_ => {
return NotSupportedSnafu {
feat: format!("convert {} to MySQL value", column.data_type()),
}
.fail();
}
Value::Time(v) => row_writer
.write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
}
}
row_writer.end_row().await?;