From b7e834ab929dadbc65c80ad7dbc596b4757b3808 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Mon, 3 Nov 2025 15:52:37 +0800 Subject: [PATCH] refactor: convert to influxdb values directly from arrow (#7163) * refactor: convert to influxdb values directly from arrow Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong --------- Signed-off-by: luofucong --- .../benches/iter_record_batch_rows.rs | 22 -- src/common/recordbatch/src/recordbatch.rs | 132 +------- src/datatypes/src/value.rs | 6 + .../src/http/result/influxdb_result_v1.rs | 303 ++++++++++++++++-- .../src/tests/instance_kafka_wal_test.rs | 8 +- 5 files changed, 303 insertions(+), 168 deletions(-) diff --git a/src/common/recordbatch/benches/iter_record_batch_rows.rs b/src/common/recordbatch/benches/iter_record_batch_rows.rs index b819a4658e..7b95189550 100644 --- a/src/common/recordbatch/benches/iter_record_batch_rows.rs +++ b/src/common/recordbatch/benches/iter_record_batch_rows.rs @@ -26,7 +26,6 @@ use datatypes::arrow::datatypes::{ Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; -use datatypes::schema::SchemaRef; fn prepare_record_batch(rows: usize) -> RecordBatch { let schema = Schema::new(vec![ @@ -56,14 +55,6 @@ fn prepare_record_batch(rows: usize) -> RecordBatch { RecordBatch::try_new(Arc::new(schema), columns).unwrap() } -fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) { - let record_batch = - common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap(); - for row in record_batch.rows() { - black_box(row); - } -} - fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) { for i in 0..record_batch.num_rows() { for column in record_batch.columns() { @@ -125,19 +116,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("iter_record_batch"); for rows in [1usize, 10, 100, 1_000, 10_000] { - group.bench_with_input( - BenchmarkId::new("by_greptimedb_values", rows), - &rows, - |b, rows| { - let record_batch = prepare_record_batch(*rows); - let schema = - Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap()); - b.iter(|| { - iter_by_greptimedb_values(schema.clone(), record_batch.clone()); - }) - }, - ); - group.bench_with_input( BenchmarkId::new("by_loop_rows_and_columns", rows), &rows, diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 3cc30ce1ba..727950495a 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -23,7 +23,6 @@ use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef a use datatypes::arrow::array::RecordBatchOptions; use datatypes::prelude::DataType; use datatypes::schema::SchemaRef; -use datatypes::value::Value; use datatypes::vectors::{Helper, VectorRef}; use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; @@ -194,11 +193,6 @@ impl RecordBatch { self.df_record_batch.num_rows() } - /// Create an iterator to traverse the data by row - pub fn rows(&self) -> RecordBatchRowIterator<'_> { - RecordBatchRowIterator::new(self) - } - pub fn column_vectors( &self, table_name: &str, @@ -277,44 +271,6 @@ impl Serialize for RecordBatch { } } -pub struct RecordBatchRowIterator<'a> { - record_batch: &'a RecordBatch, - rows: usize, - columns: usize, - row_cursor: usize, -} - -impl<'a> RecordBatchRowIterator<'a> { - fn new(record_batch: &'a RecordBatch) -> RecordBatchRowIterator<'a> { - RecordBatchRowIterator { - record_batch, - rows: record_batch.df_record_batch.num_rows(), - columns: record_batch.df_record_batch.num_columns(), - row_cursor: 0, - } - } -} - -impl Iterator for RecordBatchRowIterator<'_> { - type Item = Vec; - - fn next(&mut self) -> Option { - if self.row_cursor == self.rows { - None - } else { - let mut row = Vec::with_capacity(self.columns); - - for col in 0..self.columns { - let column = self.record_batch.column(col); - row.push(column.get(self.row_cursor)); - } - - self.row_cursor += 1; - Some(row) - } - } -} - /// merge multiple recordbatch into a single pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result { let batches_len = batches.len(); @@ -349,7 +305,9 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul mod tests { use std::sync::Arc; - use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datatypes::arrow::array::{AsArray, UInt32Array}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type}; + use datatypes::arrow_array::StringArray; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{StringVector, UInt32Vector}; @@ -407,64 +365,6 @@ mod tests { ); } - #[test] - fn test_record_batch_visitor() { - let column_schemas = vec![ - ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false), - ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true), - ]; - let schema = Arc::new(Schema::new(column_schemas)); - let columns: Vec = vec![ - Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])), - Arc::new(StringVector::from(vec![ - None, - Some("hello"), - Some("greptime"), - None, - ])), - ]; - let recordbatch = RecordBatch::new(schema, columns).unwrap(); - - let mut record_batch_iter = recordbatch.rows(); - assert_eq!( - vec![Value::UInt32(1), Value::Null], - record_batch_iter - .next() - .unwrap() - .into_iter() - .collect::>() - ); - - assert_eq!( - vec![Value::UInt32(2), Value::String("hello".into())], - record_batch_iter - .next() - .unwrap() - .into_iter() - .collect::>() - ); - - assert_eq!( - vec![Value::UInt32(3), Value::String("greptime".into())], - record_batch_iter - .next() - .unwrap() - .into_iter() - .collect::>() - ); - - assert_eq!( - vec![Value::UInt32(4), Value::Null], - record_batch_iter - .next() - .unwrap() - .into_iter() - .collect::>() - ); - - assert!(record_batch_iter.next().is_none()); - } - #[test] fn test_record_batch_slice() { let column_schemas = vec![ @@ -483,26 +383,16 @@ mod tests { ]; let recordbatch = RecordBatch::new(schema, columns).unwrap(); let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice"); - let mut record_batch_iter = recordbatch.rows(); - assert_eq!( - vec![Value::UInt32(2), Value::String("hello".into())], - record_batch_iter - .next() - .unwrap() - .into_iter() - .collect::>() - ); - assert_eq!( - vec![Value::UInt32(3), Value::String("greptime".into())], - record_batch_iter - .next() - .unwrap() - .into_iter() - .collect::>() - ); + let expected = &UInt32Array::from_iter_values([2u32, 3]); + let array = recordbatch.column(0).to_arrow_array(); + let actual = array.as_primitive::(); + assert_eq!(expected, actual); - assert!(record_batch_iter.next().is_none()); + let expected = &StringArray::from(vec!["hello", "greptime"]); + let array = recordbatch.column(1).to_arrow_array(); + let actual = array.as_string::(); + assert_eq!(expected, actual); assert!(recordbatch.slice(1, 5).is_err()); } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 7acc7073d4..90ed848b7d 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -873,6 +873,12 @@ impl From<&[u8]> for Value { } } +impl From<()> for Value { + fn from(_: ()) -> Self { + Value::Null + } +} + impl TryFrom for serde_json::Value { type Error = serde_json::Error; diff --git a/src/servers/src/http/result/influxdb_result_v1.rs b/src/servers/src/http/result/influxdb_result_v1.rs index 36d59b9878..e5f11d8aba 100644 --- a/src/servers/src/http/result/influxdb_result_v1.rs +++ b/src/servers/src/http/result/influxdb_result_v1.rs @@ -12,16 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::array::AsArray; +use arrow::datatypes::{ + Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, + DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, + Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use axum::Json; use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; +use common_decimal::Decimal128; use common_query::{Output, OutputData}; use common_recordbatch::{RecordBatch, util}; +use common_time::time::Time; +use common_time::{ + Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, +}; +use datafusion_common::ScalarValue; use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::ResultExt; -use crate::error::{Error, ToJsonSnafu}; +use crate::error::{ + ConvertScalarValueSnafu, DataFusionSnafu, Error, NotSupportedSnafu, Result, ToJsonSnafu, +}; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::result::error_result::ErrorResponse; use crate::http::{Epoch, HttpResponse, ResponseFormat}; @@ -77,27 +95,8 @@ impl TryFrom<(Option, Vec)> for InfluxdbRecordsOutput { Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::()); for recordbatch in recordbatches { - for row in recordbatch.rows() { - let value_row = row - .into_iter() - .map(|value| { - let value = match (epoch, &value) { - (Some(epoch), datatypes::value::Value::Timestamp(ts)) => { - if let Some(timestamp) = epoch.convert_timestamp(*ts) { - datatypes::value::Value::Timestamp(timestamp) - } else { - value - } - } - _ => value, - }; - Value::try_from(value) - }) - .collect::, _>>() - .context(ToJsonSnafu)?; - - rows.push(value_row); - } + let mut writer = RowWriter::new(epoch, recordbatch.num_columns()); + writer.write(recordbatch, &mut rows)?; } Ok(InfluxdbRecordsOutput::new(columns, rows)) @@ -105,6 +104,266 @@ impl TryFrom<(Option, Vec)> for InfluxdbRecordsOutput { } } +struct RowWriter { + epoch: Option, + columns: usize, + current: Option>, +} + +impl RowWriter { + fn new(epoch: Option, columns: usize) -> Self { + Self { + epoch, + columns, + current: None, + } + } + + fn push(&mut self, value: impl Into) -> Result<()> { + let value = value.into(); + + let current = self + .current + .get_or_insert_with(|| Vec::with_capacity(self.columns)); + let value = Value::try_from(value).context(ToJsonSnafu)?; + current.push(value); + Ok(()) + } + + fn finish(&mut self) -> Vec { + self.current.take().unwrap_or_default() + } + + fn write(&mut self, record_batch: RecordBatch, rows: &mut Vec>) -> Result<()> { + let record_batch = record_batch.into_df_record_batch(); + for i in 0..record_batch.num_rows() { + for array in record_batch.columns().iter() { + if array.is_null(i) { + self.push(datatypes::value::Value::Null)?; + continue; + } + + match array.data_type() { + DataType::Null => { + self.push(datatypes::value::Value::Null)?; + } + DataType::Boolean => { + let array = array.as_boolean(); + let v = array.value(i); + self.push(v)?; + } + DataType::UInt8 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::UInt16 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::UInt32 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::UInt64 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Int8 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Int16 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Int32 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Int64 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Float32 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Float64 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Utf8 => { + let array = array.as_string::(); + let v = array.value(i); + self.push(v)?; + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + let v = array.value(i); + self.push(v)?; + } + DataType::Utf8View => { + let array = array.as_string_view(); + let v = array.value(i); + self.push(v)?; + } + DataType::Binary => { + let array = array.as_binary::(); + let v = array.value(i); + self.push(v)?; + } + DataType::LargeBinary => { + let array = array.as_binary::(); + let v = array.value(i); + self.push(v)?; + } + DataType::BinaryView => { + let array = array.as_binary_view(); + let v = array.value(i); + self.push(v)?; + } + DataType::Date32 => { + let array = array.as_primitive::(); + let v = Date::new(array.value(i)); + self.push(v)?; + } + DataType::Date64 => { + let array = array.as_primitive::(); + // `Date64` values are milliseconds representation of `Date32` values, + // according to its specification. So we convert the `Date64` value here to + // the `Date32` value to process them unified. + let v = Date::new((array.value(i) / 86_400_000) as i32); + self.push(v)?; + } + DataType::Timestamp(time_unit, _) => { + let v = match time_unit { + TimeUnit::Second => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Millisecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Microsecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Nanosecond => { + let array = array.as_primitive::(); + array.value(i) + } + }; + let mut ts = Timestamp::new(v, time_unit.into()); + if let Some(epoch) = self.epoch + && let Some(converted) = epoch.convert_timestamp(ts) + { + ts = converted; + } + self.push(ts)?; + } + DataType::Time32(time_unit) | DataType::Time64(time_unit) => { + let v = match time_unit { + TimeUnit::Second => { + let array = array.as_primitive::(); + Time::new_second(array.value(i) as i64) + } + TimeUnit::Millisecond => { + let array = array.as_primitive::(); + Time::new_millisecond(array.value(i) as i64) + } + TimeUnit::Microsecond => { + let array = array.as_primitive::(); + Time::new_microsecond(array.value(i)) + } + TimeUnit::Nanosecond => { + let array = array.as_primitive::(); + Time::new_nanosecond(array.value(i)) + } + }; + self.push(v)?; + } + DataType::Interval(interval_unit) => match interval_unit { + IntervalUnit::YearMonth => { + let array = array.as_primitive::(); + let v: IntervalYearMonth = array.value(i).into(); + self.push(v)?; + } + IntervalUnit::DayTime => { + let array = array.as_primitive::(); + let v: IntervalDayTime = array.value(i).into(); + self.push(v)?; + } + IntervalUnit::MonthDayNano => { + let array = array.as_primitive::(); + let v: IntervalMonthDayNano = array.value(i).into(); + self.push(v)?; + } + }, + DataType::Duration(time_unit) => { + let v = match time_unit { + TimeUnit::Second => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Millisecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Microsecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Nanosecond => { + let array = array.as_primitive::(); + array.value(i) + } + }; + let d = Duration::new(v, time_unit.into()); + self.push(d)?; + } + DataType::List(_) => { + let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?; + let v: datatypes::value::Value = + v.try_into().context(ConvertScalarValueSnafu)?; + self.push(v)?; + } + DataType::Struct(_) => { + let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?; + let v: datatypes::value::Value = + v.try_into().context(ConvertScalarValueSnafu)?; + self.push(v)?; + } + DataType::Decimal128(precision, scale) => { + let array = array.as_primitive::(); + let v = Decimal128::new(array.value(i), *precision, *scale); + self.push(v)?; + } + _ => { + return NotSupportedSnafu { + feat: format!("convert {} to influxdb value", array.data_type()), + } + .fail(); + } + } + } + + rows.push(self.finish()) + } + Ok(()) + } +} + #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct InfluxdbOutput { pub statement_id: u32, diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs index d019a42387..053521c86f 100644 --- a/tests-integration/src/tests/instance_kafka_wal_test.rs +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -18,6 +18,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use client::DEFAULT_CATALOG_NAME; use common_query::{Output, OutputData}; +use datatypes::arrow::array::AsArray; +use datatypes::arrow::datatypes::TimestampMillisecondType; use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; use frontend::instance::Instance; use itertools::Itertools; @@ -224,9 +226,9 @@ async fn ensure_data_exists(tables: &[Table], instance: &Arc) { let queried = record_batches .into_iter() .flat_map(|rb| { - rb.rows() - .map(|row| row[0].as_timestamp().unwrap().value() as u64) - .collect::>() + let array = rb.column(0).to_arrow_array(); + let array = array.as_primitive::(); + array.iter().flatten().map(|x| x as u64).collect::>() }) .collect::>(); let inserted = table