refactor: convert to influxdb values directly from arrow (#7163)

* refactor: convert to influxdb values directly from arrow

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-11-03 15:52:37 +08:00
committed by GitHub
parent 5eab9a1be3
commit b7e834ab92
5 changed files with 303 additions and 168 deletions

View File

@@ -26,7 +26,6 @@ use datatypes::arrow::datatypes::{
Int32Type, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
};
use datatypes::schema::SchemaRef;
fn prepare_record_batch(rows: usize) -> RecordBatch {
let schema = Schema::new(vec![
@@ -56,14 +55,6 @@ fn prepare_record_batch(rows: usize) -> RecordBatch {
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}
fn iter_by_greptimedb_values(schema: SchemaRef, record_batch: RecordBatch) {
let record_batch =
common_recordbatch::RecordBatch::try_from_df_record_batch(schema, record_batch).unwrap();
for row in record_batch.rows() {
black_box(row);
}
}
fn iter_by_loop_rows_and_columns(record_batch: RecordBatch) {
for i in 0..record_batch.num_rows() {
for column in record_batch.columns() {
@@ -125,19 +116,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("iter_record_batch");
for rows in [1usize, 10, 100, 1_000, 10_000] {
group.bench_with_input(
BenchmarkId::new("by_greptimedb_values", rows),
&rows,
|b, rows| {
let record_batch = prepare_record_batch(*rows);
let schema =
Arc::new(datatypes::schema::Schema::try_from(record_batch.schema()).unwrap());
b.iter(|| {
iter_by_greptimedb_values(schema.clone(), record_batch.clone());
})
},
);
group.bench_with_input(
BenchmarkId::new("by_loop_rows_and_columns", rows),
&rows,

View File

@@ -23,7 +23,6 @@ use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef a
use datatypes::arrow::array::RecordBatchOptions;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
@@ -194,11 +193,6 @@ impl RecordBatch {
self.df_record_batch.num_rows()
}
/// Create an iterator to traverse the data by row
pub fn rows(&self) -> RecordBatchRowIterator<'_> {
RecordBatchRowIterator::new(self)
}
pub fn column_vectors(
&self,
table_name: &str,
@@ -277,44 +271,6 @@ impl Serialize for RecordBatch {
}
}
pub struct RecordBatchRowIterator<'a> {
record_batch: &'a RecordBatch,
rows: usize,
columns: usize,
row_cursor: usize,
}
impl<'a> RecordBatchRowIterator<'a> {
fn new(record_batch: &'a RecordBatch) -> RecordBatchRowIterator<'a> {
RecordBatchRowIterator {
record_batch,
rows: record_batch.df_record_batch.num_rows(),
columns: record_batch.df_record_batch.num_columns(),
row_cursor: 0,
}
}
}
impl Iterator for RecordBatchRowIterator<'_> {
type Item = Vec<Value>;
fn next(&mut self) -> Option<Self::Item> {
if self.row_cursor == self.rows {
None
} else {
let mut row = Vec::with_capacity(self.columns);
for col in 0..self.columns {
let column = self.record_batch.column(col);
row.push(column.get(self.row_cursor));
}
self.row_cursor += 1;
Some(row)
}
}
}
/// merge multiple recordbatch into a single
pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
let batches_len = batches.len();
@@ -349,7 +305,9 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
mod tests {
use std::sync::Arc;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow::array::{AsArray, UInt32Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector};
@@ -407,64 +365,6 @@ mod tests {
);
}
#[test]
fn test_record_batch_visitor() {
let column_schemas = vec![
ColumnSchema::new("numbers", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3, 4])),
Arc::new(StringVector::from(vec![
None,
Some("hello"),
Some("greptime"),
None,
])),
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let mut record_batch_iter = recordbatch.rows();
assert_eq!(
vec![Value::UInt32(1), Value::Null],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(2), Value::String("hello".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(3), Value::String("greptime".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(4), Value::Null],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert!(record_batch_iter.next().is_none());
}
#[test]
fn test_record_batch_slice() {
let column_schemas = vec![
@@ -483,26 +383,16 @@ mod tests {
];
let recordbatch = RecordBatch::new(schema, columns).unwrap();
let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
let mut record_batch_iter = recordbatch.rows();
assert_eq!(
vec![Value::UInt32(2), Value::String("hello".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
assert_eq!(
vec![Value::UInt32(3), Value::String("greptime".into())],
record_batch_iter
.next()
.unwrap()
.into_iter()
.collect::<Vec<Value>>()
);
let expected = &UInt32Array::from_iter_values([2u32, 3]);
let array = recordbatch.column(0).to_arrow_array();
let actual = array.as_primitive::<UInt32Type>();
assert_eq!(expected, actual);
assert!(record_batch_iter.next().is_none());
let expected = &StringArray::from(vec!["hello", "greptime"]);
let array = recordbatch.column(1).to_arrow_array();
let actual = array.as_string::<i32>();
assert_eq!(expected, actual);
assert!(recordbatch.slice(1, 5).is_err());
}

View File

@@ -873,6 +873,12 @@ impl From<&[u8]> for Value {
}
}
impl From<()> for Value {
fn from(_: ()) -> Self {
Value::Null
}
}
impl TryFrom<Value> for serde_json::Value {
type Error = serde_json::Error;

View File

@@ -12,16 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::AsArray;
use arrow::datatypes::{
Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use axum::Json;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use common_decimal::Decimal128;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, util};
use common_time::time::Time;
use common_time::{
Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use datafusion_common::ScalarValue;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use crate::error::{Error, ToJsonSnafu};
use crate::error::{
ConvertScalarValueSnafu, DataFusionSnafu, Error, NotSupportedSnafu, Result, ToJsonSnafu,
};
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::result::error_result::ErrorResponse;
use crate::http::{Epoch, HttpResponse, ResponseFormat};
@@ -77,27 +95,8 @@ impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
for recordbatch in recordbatches {
for row in recordbatch.rows() {
let value_row = row
.into_iter()
.map(|value| {
let value = match (epoch, &value) {
(Some(epoch), datatypes::value::Value::Timestamp(ts)) => {
if let Some(timestamp) = epoch.convert_timestamp(*ts) {
datatypes::value::Value::Timestamp(timestamp)
} else {
value
}
}
_ => value,
};
Value::try_from(value)
})
.collect::<Result<Vec<Value>, _>>()
.context(ToJsonSnafu)?;
rows.push(value_row);
}
let mut writer = RowWriter::new(epoch, recordbatch.num_columns());
writer.write(recordbatch, &mut rows)?;
}
Ok(InfluxdbRecordsOutput::new(columns, rows))
@@ -105,6 +104,266 @@ impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
}
}
struct RowWriter {
epoch: Option<Epoch>,
columns: usize,
current: Option<Vec<Value>>,
}
impl RowWriter {
fn new(epoch: Option<Epoch>, columns: usize) -> Self {
Self {
epoch,
columns,
current: None,
}
}
fn push(&mut self, value: impl Into<datatypes::value::Value>) -> Result<()> {
let value = value.into();
let current = self
.current
.get_or_insert_with(|| Vec::with_capacity(self.columns));
let value = Value::try_from(value).context(ToJsonSnafu)?;
current.push(value);
Ok(())
}
fn finish(&mut self) -> Vec<Value> {
self.current.take().unwrap_or_default()
}
fn write(&mut self, record_batch: RecordBatch, rows: &mut Vec<Vec<Value>>) -> Result<()> {
let record_batch = record_batch.into_df_record_batch();
for i in 0..record_batch.num_rows() {
for array in record_batch.columns().iter() {
if array.is_null(i) {
self.push(datatypes::value::Value::Null)?;
continue;
}
match array.data_type() {
DataType::Null => {
self.push(datatypes::value::Value::Null)?;
}
DataType::Boolean => {
let array = array.as_boolean();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt8 => {
let array = array.as_primitive::<UInt8Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt16 => {
let array = array.as_primitive::<UInt16Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt32 => {
let array = array.as_primitive::<UInt32Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt64 => {
let array = array.as_primitive::<UInt64Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Float32 => {
let array = array.as_primitive::<Float32Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Float64 => {
let array = array.as_primitive::<Float64Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Utf8 => {
let array = array.as_string::<i32>();
let v = array.value(i);
self.push(v)?;
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
let v = array.value(i);
self.push(v)?;
}
DataType::Utf8View => {
let array = array.as_string_view();
let v = array.value(i);
self.push(v)?;
}
DataType::Binary => {
let array = array.as_binary::<i32>();
let v = array.value(i);
self.push(v)?;
}
DataType::LargeBinary => {
let array = array.as_binary::<i64>();
let v = array.value(i);
self.push(v)?;
}
DataType::BinaryView => {
let array = array.as_binary_view();
let v = array.value(i);
self.push(v)?;
}
DataType::Date32 => {
let array = array.as_primitive::<Date32Type>();
let v = Date::new(array.value(i));
self.push(v)?;
}
DataType::Date64 => {
let array = array.as_primitive::<Date64Type>();
// `Date64` values are milliseconds representation of `Date32` values,
// according to its specification. So we convert the `Date64` value here to
// the `Date32` value to process them unified.
let v = Date::new((array.value(i) / 86_400_000) as i32);
self.push(v)?;
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let mut ts = Timestamp::new(v, time_unit.into());
if let Some(epoch) = self.epoch
&& let Some(converted) = epoch.convert_timestamp(ts)
{
ts = converted;
}
self.push(ts)?;
}
DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
};
self.push(v)?;
}
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = array.as_primitive::<IntervalYearMonthType>();
let v: IntervalYearMonth = array.value(i).into();
self.push(v)?;
}
IntervalUnit::DayTime => {
let array = array.as_primitive::<IntervalDayTimeType>();
let v: IntervalDayTime = array.value(i).into();
self.push(v)?;
}
IntervalUnit::MonthDayNano => {
let array = array.as_primitive::<IntervalMonthDayNanoType>();
let v: IntervalMonthDayNano = array.value(i).into();
self.push(v)?;
}
},
DataType::Duration(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<DurationSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<DurationMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<DurationMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<DurationNanosecondType>();
array.value(i)
}
};
let d = Duration::new(v, time_unit.into());
self.push(d)?;
}
DataType::List(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
let v: datatypes::value::Value =
v.try_into().context(ConvertScalarValueSnafu)?;
self.push(v)?;
}
DataType::Struct(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
let v: datatypes::value::Value =
v.try_into().context(ConvertScalarValueSnafu)?;
self.push(v)?;
}
DataType::Decimal128(precision, scale) => {
let array = array.as_primitive::<Decimal128Type>();
let v = Decimal128::new(array.value(i), *precision, *scale);
self.push(v)?;
}
_ => {
return NotSupportedSnafu {
feat: format!("convert {} to influxdb value", array.data_type()),
}
.fail();
}
}
}
rows.push(self.finish())
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct InfluxdbOutput {
pub statement_id: u32,

View File

@@ -18,6 +18,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
use client::DEFAULT_CATALOG_NAME;
use common_query::{Output, OutputData};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::TimestampMillisecondType;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::instance::Instance;
use itertools::Itertools;
@@ -224,9 +226,9 @@ async fn ensure_data_exists(tables: &[Table], instance: &Arc<Instance>) {
let queried = record_batches
.into_iter()
.flat_map(|rb| {
rb.rows()
.map(|row| row[0].as_timestamp().unwrap().value() as u64)
.collect::<Vec<_>>()
let array = rb.column(0).to_arrow_array();
let array = array.as_primitive::<TimestampMillisecondType>();
array.iter().flatten().map(|x| x as u64).collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let inserted = table