diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 0646c3e2a3..239f3fe3f9 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -435,10 +435,10 @@ impl Database { .context(ExternalSnafu)?; match flight_message { FlightMessage::RecordBatch(arrow_batch) => { - yield RecordBatch::try_from_df_record_batch( + yield Ok(RecordBatch::from_df_record_batch( schema_cloned.clone(), arrow_batch, - ) + )) } FlightMessage::Metrics(_) => {} FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 6e5a286083..3e80b83cec 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -182,10 +182,8 @@ impl RegionRequester { match flight_message { FlightMessage::RecordBatch(record_batch) => { - let result_to_yield = RecordBatch::try_from_df_record_batch( - schema_cloned.clone(), - record_batch, - ); + let result_to_yield = + RecordBatch::from_df_record_batch(schema_cloned.clone(), record_batch); // get the next message from the stream. normally it should be a metrics message. if let Some(next_flight_message_result) = flight_message_stream.next().await @@ -219,7 +217,7 @@ impl RegionRequester { stream_ended = true; } - yield result_to_yield; + yield Ok(result_to_yield); } FlightMessage::Metrics(s) => { // just a branch in case of some metrics message comes after other things. diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 618795bb4a..e70b9f4833 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -52,9 +52,6 @@ pub enum Error { data_type: ArrowDatatype, }, - #[snafu(display("Failed to downcast vector: {}", err_msg))] - DowncastVector { err_msg: String }, - #[snafu(display("Invalid input type: {}", err_msg))] InvalidInputType { #[snafu(implicit)] @@ -209,8 +206,7 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::DowncastVector { .. } - | Error::InvalidInputState { .. } + Error::InvalidInputState { .. } | Error::ToScalarValue { .. } | Error::GetScalarVector { .. } | Error::ArrowCompute { .. } diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index fdec79fdef..7e504559b6 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -314,10 +314,10 @@ impl Stream for RecordBatchStreamAdapter { metric_collector.record_batch_metrics, ); } - Poll::Ready(Some(RecordBatch::try_from_df_record_batch( + Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch( self.schema(), df_record_batch, - ))) + )))) } Poll::Ready(None) => { if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) = diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 2584b41b25..00d4291ead 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -133,18 +133,6 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display( - "Failed to downcast vector of type '{:?}' to type '{:?}'", - from_type, - to_type - ))] - DowncastVector { - from_type: ConcreteDataType, - to_type: ConcreteDataType, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Error occurs when performing arrow computation"))] ArrowCompute { #[snafu(source)] @@ -217,8 +205,6 @@ impl ErrorExt for Error { | Error::PhysicalExpr { .. } | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal, - Error::DowncastVector { .. } => StatusCode::Unexpected, - Error::PollStream { .. } => StatusCode::EngineExecuteQuery, Error::ArrowCompute { .. } => StatusCode::IllegalState, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 1f5f28e87f..4804e9040e 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -30,19 +30,20 @@ use adapter::RecordBatchMetrics; use arc_swap::ArcSwapOption; use common_base::readable_size::ReadableSize; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder}; use datatypes::arrow::compute::SortOptions; pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::util::pretty; use datatypes::prelude::{ConcreteDataType, VectorRef}; -use datatypes::scalars::{ScalarVector, ScalarVectorBuilder}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::types::{JsonFormat, jsonb_to_string}; -use datatypes::vectors::{BinaryVector, StringVectorBuilder}; use error::Result; use futures::task::{Context, Poll}; use futures::{Stream, TryStreamExt}; pub use recordbatch::RecordBatch; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{ResultExt, ensure}; + +use crate::error::NewDfRecordBatchSnafu; pub trait RecordBatchStream: Stream> { fn name(&self) -> &str { @@ -92,20 +93,14 @@ pub fn map_json_type_to_string( mapped_schema: &SchemaRef, ) -> Result { let mut vectors = Vec::with_capacity(original_schema.column_schemas().len()); - for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) { + for (vector, schema) in batch.columns().iter().zip(original_schema.column_schemas()) { if let ConcreteDataType::Json(j) = &schema.data_type { if matches!(&j.format, JsonFormat::Jsonb) { - let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len()); - let binary_vector = vector - .as_any() - .downcast_ref::() - .with_context(|| error::DowncastVectorSnafu { - from_type: schema.data_type.clone(), - to_type: ConcreteDataType::binary_datatype(), - })?; - for value in binary_vector.iter_data() { + let mut string_vector_builder = StringBuilder::new(); + let binary_vector = vector.as_binary::(); + for value in binary_vector.iter() { let Some(value) = value else { - string_vector_builder.push(None); + string_vector_builder.append_null(); continue; }; let string_value = @@ -113,11 +108,11 @@ pub fn map_json_type_to_string( from_type: schema.data_type.clone(), to_type: ConcreteDataType::string_datatype(), })?; - string_vector_builder.push(Some(string_value.as_str())); + string_vector_builder.append_value(string_value); } let string_vector = string_vector_builder.finish(); - vectors.push(Arc::new(string_vector) as VectorRef); + vectors.push(Arc::new(string_vector) as ArrayRef); } else { vectors.push(vector.clone()); } @@ -126,7 +121,15 @@ pub fn map_json_type_to_string( } } - RecordBatch::new(mapped_schema.clone(), vectors) + let record_batch = datatypes::arrow::record_batch::RecordBatch::try_new( + mapped_schema.arrow_schema().clone(), + vectors, + ) + .context(NewDfRecordBatchSnafu)?; + Ok(RecordBatch::from_df_record_batch( + mapped_schema.clone(), + record_batch, + )) } /// Maps the json type to string in the schema. @@ -755,11 +758,7 @@ impl Stream for MemoryTrackedStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.inner).poll_next(cx) { Poll::Ready(Some(Ok(batch))) => { - let additional = batch - .columns() - .iter() - .map(|c| c.memory_size()) - .sum::(); + let additional = batch.buffer_memory_size(); if let Err(e) = self.permit.track(additional, self.total_tracked) { return Poll::Ready(Some(Err(e))); diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 727950495a..f04fd49dd2 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -20,7 +20,7 @@ use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion_common::arrow::array::ArrayRef; use datafusion_common::arrow::compute; use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; -use datatypes::arrow::array::RecordBatchOptions; +use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions}; use datatypes::prelude::DataType; use datatypes::schema::SchemaRef; use datatypes::vectors::{Helper, VectorRef}; @@ -30,15 +30,14 @@ use snafu::{OptionExt, ResultExt, ensure}; use crate::DfRecordBatch; use crate::error::{ - self, ArrowComputeSnafu, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu, - ProjectArrowRecordBatchSnafu, Result, + self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu, + Result, }; /// A two-dimensional batch of column-oriented data with a defined schema. #[derive(Clone, Debug, PartialEq)] pub struct RecordBatch { pub schema: SchemaRef, - pub columns: Vec, df_record_batch: DfRecordBatch, } @@ -65,7 +64,6 @@ impl RecordBatch { Ok(RecordBatch { schema, - columns, df_record_batch, }) } @@ -91,14 +89,8 @@ impl RecordBatch { /// Create an empty [`RecordBatch`] from `schema`. pub fn new_empty(schema: SchemaRef) -> RecordBatch { let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone()); - let columns = schema - .column_schemas() - .iter() - .map(|col| col.data_type.create_mutable_vector(0).to_vector()) - .collect(); RecordBatch { schema, - columns, df_record_batch, } } @@ -113,17 +105,12 @@ impl RecordBatch { .context(error::NewDfRecordBatchSnafu)?; Ok(RecordBatch { schema, - columns: vec![], df_record_batch, }) } pub fn try_project(&self, indices: &[usize]) -> Result { let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?); - let mut columns = Vec::with_capacity(indices.len()); - for index in indices { - columns.push(self.columns[*index].clone()); - } let df_record_batch = self.df_record_batch.project(indices).with_context(|_| { ProjectArrowRecordBatchSnafu { schema: self.schema.clone(), @@ -133,7 +120,6 @@ impl RecordBatch { Ok(Self { schema, - columns, df_record_batch, }) } @@ -141,21 +127,11 @@ impl RecordBatch { /// Create a new [`RecordBatch`] from `schema` and `df_record_batch`. /// /// This method doesn't check the schema. - pub fn try_from_df_record_batch( - schema: SchemaRef, - df_record_batch: DfRecordBatch, - ) -> Result { - let columns = df_record_batch - .columns() - .iter() - .map(|c| Helper::try_into_vector(c.clone()).context(error::DataTypesSnafu)) - .collect::>>()?; - - Ok(RecordBatch { + pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch { + RecordBatch { schema, - columns, df_record_batch, - }) + } } #[inline] @@ -169,23 +145,22 @@ impl RecordBatch { } #[inline] - pub fn columns(&self) -> &[VectorRef] { - &self.columns + pub fn columns(&self) -> &[ArrayRef] { + self.df_record_batch.columns() } #[inline] - pub fn column(&self, idx: usize) -> &VectorRef { - &self.columns[idx] + pub fn column(&self, idx: usize) -> &ArrayRef { + self.df_record_batch.column(idx) } - pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> { - let idx = self.schema.column_index_by_name(name)?; - Some(&self.columns[idx]) + pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> { + self.df_record_batch.column_by_name(name) } #[inline] pub fn num_columns(&self) -> usize { - self.columns.len() + self.df_record_batch.num_columns() } #[inline] @@ -201,9 +176,14 @@ impl RecordBatch { let mut vectors = HashMap::with_capacity(self.num_columns()); // column schemas in recordbatch must match its vectors, otherwise it's corrupted - for (vector_schema, vector) in self.schema.column_schemas().iter().zip(self.columns.iter()) + for (field, array) in self + .df_record_batch + .schema() + .fields() + .iter() + .zip(self.df_record_batch.columns().iter()) { - let column_name = &vector_schema.name; + let column_name = field.name(); let column_schema = table_schema .column_schema_by_name(column_name) @@ -211,15 +191,12 @@ impl RecordBatch { table_name, column_name, })?; - let vector = if vector_schema.data_type != column_schema.data_type { - vector - .cast(&column_schema.data_type) - .with_context(|_| CastVectorSnafu { - from_type: vector.data_type(), - to_type: column_schema.data_type.clone(), - })? + let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() { + let array = compute::cast(array, &column_schema.data_type.as_arrow_type()) + .context(ArrowComputeSnafu)?; + Helper::try_into_vector(array).context(DataTypesSnafu)? } else { - vector.clone() + Helper::try_into_vector(array).context(DataTypesSnafu)? }; let _ = vectors.insert(column_name.clone(), vector); @@ -244,8 +221,69 @@ impl RecordBatch { visit_index: offset + len } ); - let columns = self.columns.iter().map(|vector| vector.slice(offset, len)); - RecordBatch::new(self.schema.clone(), columns) + let sliced = self.df_record_batch.slice(offset, len); + Ok(RecordBatch::from_df_record_batch( + self.schema.clone(), + sliced, + )) + } + + /// Returns the total number of bytes of memory pointed to by the arrays in this `RecordBatch`. + /// + /// The buffers store bytes in the Arrow memory format, and include the data as well as the validity map. + /// Note that this does not always correspond to the exact memory usage of an array, + /// since multiple arrays can share the same buffers or slices thereof. + pub fn buffer_memory_size(&self) -> usize { + self.df_record_batch + .columns() + .iter() + .map(|array| array.get_buffer_memory_size()) + .sum() + } + + /// Iterate the values as strings in the column at index `i`. + /// + /// Note that if the underlying array is not a valid GreptimeDB vector, an empty iterator is + /// returned. + /// + /// # Panics + /// if index `i` is out of bound. + pub fn iter_column_as_string(&self, i: usize) -> Box> + '_> { + macro_rules! iter { + ($column: ident) => { + Box::new( + (0..$column.len()) + .map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())), + ) + }; + } + + let column = self.df_record_batch.column(i); + match column.data_type() { + ArrowDataType::Utf8 => { + let column = column.as_string::(); + let iter = iter!(column); + iter as _ + } + ArrowDataType::LargeUtf8 => { + let column = column.as_string::(); + iter!(column) + } + ArrowDataType::Utf8View => { + let column = column.as_string_view(); + iter!(column) + } + _ => { + if let Ok(column) = Helper::try_into_vector(column) { + Box::new( + (0..column.len()) + .map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())), + ) + } else { + Box::new(std::iter::empty()) + } + } + } } } @@ -259,8 +297,9 @@ impl Serialize for RecordBatch { let mut s = serializer.serialize_struct("record", 2)?; s.serialize_field("schema", &**self.schema.arrow_schema())?; - let vec = self - .columns + let columns = self.df_record_batch.columns(); + let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?; + let vec = columns .iter() .map(|c| c.serialize_to_json()) .collect::, _>>() @@ -278,27 +317,14 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul return Ok(RecordBatch::new_empty(schema)); } - let n_rows = batches.iter().map(|b| b.num_rows()).sum(); - let n_columns = schema.num_columns(); - // Collect arrays from each batch - let mut merged_columns = Vec::with_capacity(n_columns); - - for col_idx in 0..n_columns { - let mut acc = schema.column_schemas()[col_idx] - .data_type - .create_mutable_vector(n_rows); - - for batch in batches { - let column = batch.column(col_idx); - acc.extend_slice_of(column.as_ref(), 0, column.len()) - .context(error::DataTypesSnafu)?; - } - - merged_columns.push(acc.to_vector()); - } + let record_batch = compute::concat_batches( + schema.arrow_schema(), + batches.iter().map(|x| x.df_record_batch()), + ) + .context(ArrowComputeSnafu)?; // Create a new RecordBatch with merged columns - RecordBatch::new(schema, merged_columns) + Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } #[cfg(test)] @@ -326,21 +352,21 @@ mod tests { let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6])); let columns: Vec = vec![c1, c2]; + let expected = vec![ + Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef, + Arc::new(UInt32Array::from_iter_values([4, 5, 6])), + ]; + let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap(); assert_eq!(3, batch.num_rows()); - assert_eq!(&columns, batch.columns()); - for (i, expect) in columns.iter().enumerate().take(batch.num_columns()) { - let column = batch.column(i); - assert_eq!(expect, column); - } + assert_eq!(expected, batch.df_record_batch().columns()); assert_eq!(schema, batch.schema); - assert_eq!(columns[0], *batch.column_by_name("c1").unwrap()); - assert_eq!(columns[1], *batch.column_by_name("c2").unwrap()); + assert_eq!(&expected[0], batch.column_by_name("c1").unwrap()); + assert_eq!(&expected[1], batch.column_by_name("c2").unwrap()); assert!(batch.column_by_name("c3").is_none()); - let converted = - RecordBatch::try_from_df_record_batch(schema, batch.df_record_batch().clone()).unwrap(); + let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone()); assert_eq!(batch, converted); assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch()); } @@ -385,12 +411,12 @@ mod tests { let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice"); let expected = &UInt32Array::from_iter_values([2u32, 3]); - let array = recordbatch.column(0).to_arrow_array(); + let array = recordbatch.column(0); let actual = array.as_primitive::(); assert_eq!(expected, actual); let expected = &StringArray::from(vec!["hello", "greptime"]); - let array = recordbatch.column(1).to_arrow_array(); + let array = recordbatch.column(1); let actual = array.as_string::(); assert_eq!(expected, actual); diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index 97aa299fad..ac5e6444af 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -12,9 +12,117 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::array::{ArrayRef, AsArray}; +use arrow::datatypes::{ + DataType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, + DurationSecondType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; +use common_time::time::Time; +use common_time::{Duration, Timestamp}; + pub type BinaryArray = arrow::array::BinaryArray; pub type MutableBinaryArray = arrow::array::BinaryBuilder; pub type StringArray = arrow::array::StringArray; pub type MutableStringArray = arrow::array::StringBuilder; pub type LargeStringArray = arrow::array::LargeStringArray; pub type MutableLargeStringArray = arrow::array::LargeStringBuilder; + +/// Get the [Timestamp] value at index `i` of the timestamp array. +/// +/// Note: This method does not check for nulls and the value is arbitrary +/// if [`is_null`](arrow::array::Array::is_null) returns true for the index. +/// +/// # Panics +/// 1. if index `i` is out of bounds; +/// 2. or the array is not timestamp type. +pub fn timestamp_array_value(array: &ArrayRef, i: usize) -> Timestamp { + let DataType::Timestamp(time_unit, _) = &array.data_type() else { + unreachable!() + }; + let v = match time_unit { + TimeUnit::Second => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Millisecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Microsecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Nanosecond => { + let array = array.as_primitive::(); + array.value(i) + } + }; + Timestamp::new(v, time_unit.into()) +} + +/// Get the [Time] value at index `i` of the time array. +/// +/// Note: This method does not check for nulls and the value is arbitrary +/// if [`is_null`](arrow::array::Array::is_null) returns true for the index. +/// +/// # Panics +/// 1. if index `i` is out of bounds; +/// 2. or the array is not `Time32` or `Time64` type. +pub fn time_array_value(array: &ArrayRef, i: usize) -> Time { + match array.data_type() { + DataType::Time32(time_unit) | DataType::Time64(time_unit) => match time_unit { + TimeUnit::Second => { + let array = array.as_primitive::(); + Time::new_second(array.value(i) as i64) + } + TimeUnit::Millisecond => { + let array = array.as_primitive::(); + Time::new_millisecond(array.value(i) as i64) + } + TimeUnit::Microsecond => { + let array = array.as_primitive::(); + Time::new_microsecond(array.value(i)) + } + TimeUnit::Nanosecond => { + let array = array.as_primitive::(); + Time::new_nanosecond(array.value(i)) + } + }, + _ => unreachable!(), + } +} + +/// Get the [Duration] value at index `i` of the duration array. +/// +/// Note: This method does not check for nulls and the value is arbitrary +/// if [`is_null`](arrow::array::Array::is_null) returns true for the index. +/// +/// # Panics +/// 1. if index `i` is out of bounds; +/// 2. or the array is not duration type. +pub fn duration_array_value(array: &ArrayRef, i: usize) -> Duration { + let DataType::Duration(time_unit) = array.data_type() else { + unreachable!(); + }; + let v = match time_unit { + TimeUnit::Second => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Millisecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Microsecond => { + let array = array.as_primitive::(); + array.value(i) + } + TimeUnit::Nanosecond => { + let array = array.as_primitive::(); + array.value(i) + } + }; + Duration::new(v, time_unit.into()) +} diff --git a/src/file-engine/src/query.rs b/src/file-engine/src/query.rs index b56777d43c..93ee8eed9d 100644 --- a/src/file-engine/src/query.rs +++ b/src/file-engine/src/query.rs @@ -22,13 +22,17 @@ use std::task::{Context, Poll}; use common_datasource::object_store::build_backend; use common_error::ext::BoxedError; use common_recordbatch::adapter::RecordBatchMetrics; -use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult}; +use common_recordbatch::error::{ + CastVectorSnafu, DataTypesSnafu, ExternalSnafu, Result as RecordBatchResult, +}; use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::logical_expr::utils as df_logical_expr_utils; use datafusion_expr::expr::Expr; +use datatypes::arrow::array::ArrayRef; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{Helper, VectorRef}; use futures::Stream; use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::ScanRequest; @@ -197,7 +201,7 @@ impl FileToScanRegionStream { .all(|scan_column_schema| { file_record_batch .column_by_name(&scan_column_schema.name) - .map(|rb| rb.data_type() == scan_column_schema.data_type) + .map(|rb| rb.data_type() == &scan_column_schema.data_type.as_arrow_type()) .unwrap_or_default() }) } @@ -231,9 +235,10 @@ impl FileToScanRegionStream { } fn cast_column_type( - source_column: &VectorRef, + source_column: &ArrayRef, target_data_type: &ConcreteDataType, ) -> RecordBatchResult { + let source_column = Helper::try_into_vector(source_column).context(DataTypesSnafu)?; if &source_column.data_type() == target_data_type { Ok(source_column.clone()) } else { diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 2cfad8671e..bcddcbb891 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -199,7 +199,7 @@ impl SourceSender { /// send record batch pub async fn send_record_batch(&self, batch: RecordBatch) -> Result { let row_cnt = batch.num_rows(); - let batch = Batch::from(batch); + let batch = Batch::try_from(batch)?; self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst); diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index c17db3bf7e..5c0359e55f 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -25,6 +25,7 @@ mod signature; pub(crate) mod utils; use arrow::compute::FilterBuilder; +use common_recordbatch::RecordBatch; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::value::Value; use datatypes::vectors::{BooleanVector, Helper, VectorRef}; @@ -38,6 +39,8 @@ pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc}; pub(crate) use scalar::{ScalarExpr, TypedExpr}; use snafu::{ResultExt, ensure}; +use crate::Error; +use crate::error::DatatypesSnafu; use crate::expr::error::{ArrowSnafu, DataTypeSnafu}; use crate::repr::Diff; @@ -55,13 +58,19 @@ pub struct Batch { diffs: Option, } -impl From for Batch { - fn from(value: common_recordbatch::RecordBatch) -> Self { - Self { +impl TryFrom for Batch { + type Error = Error; + + fn try_from(value: RecordBatch) -> Result { + let columns = value.columns(); + let batch = Helper::try_into_vectors(columns).context(DatatypesSnafu { + extra: "failed to convert Arrow array to vector when building Flow batch", + })?; + Ok(Self { row_count: value.num_rows(), - batch: value.columns, + batch, diffs: None, - } + }) } } diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index 4658040d50..bb1d4cf5a8 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -34,7 +34,6 @@ use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard}; -use datatypes::value::ValueRef; use query::QueryEngineRef; use serde_json::Value as JsonValue; use servers::error::{ @@ -595,13 +594,10 @@ async fn trace_ids_from_output(output: Output) -> ServerResult> { { let mut trace_ids = vec![]; for recordbatch in recordbatches { - for col in recordbatch.columns().iter() { - for row_idx in 0..recordbatch.num_rows() { - if let ValueRef::String(value) = col.get_ref(row_idx) { - trace_ids.push(value.to_string()); - } - } - } + recordbatch + .iter_column_as_string(0) + .flatten() + .for_each(|x| trace_ids.push(x)); } return Ok(trace_ids); diff --git a/src/frontend/src/instance/promql.rs b/src/frontend/src/instance/promql.rs index 0d754167c7..4236474cf7 100644 --- a/src/frontend/src/instance/promql.rs +++ b/src/frontend/src/instance/promql.rs @@ -20,7 +20,6 @@ use common_catalog::consts::INFORMATION_SCHEMA_NAME; use common_catalog::format_full_table_name; use common_recordbatch::util; use common_telemetry::tracing; -use datatypes::prelude::Value; use promql_parser::label::{MatchOp, Matcher, Matchers}; use query::promql; use query::promql::planner::PromPlanner; @@ -90,15 +89,10 @@ impl Instance { for batch in batches { // Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`. - let names = batch.column(0); - - for i in 0..names.len() { - let Value::String(name) = names.get(i) else { - unreachable!(); - }; - - results.push(name.into_string()); - } + batch + .iter_column_as_string(0) + .flatten() + .for_each(|x| results.push(x)) } Ok(results) @@ -173,11 +167,10 @@ impl Instance { let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum()); for batch in batches { // Only one column in results, ensured by `prometheus::label_values_matchers_to_plan`. - let names = batch.column(0); - - for i in 0..names.len() { - results.push(names.get(i).to_string()); - } + batch + .iter_column_as_string(0) + .flatten() + .for_each(|x| results.push(x)) } Ok(results) diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 677cf58899..c34b44e4a7 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -317,45 +317,20 @@ pub fn decode_batch_stream( /// Decode a record batch to a list of key and value. fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> { - let key_col = batch.column(0); - let val_col = batch.column(1); - - (0..batch.num_rows()) - .flat_map(move |row_index| { - let key = key_col - .get_ref(row_index) - .try_into_string() - .unwrap() - .map(|s| s.to_string()); - - key.map(|k| { - ( - k, - val_col - .get_ref(row_index) - .try_into_string() - .unwrap() - .map(|s| s.to_string()) - .unwrap_or_default(), - ) - }) + let keys = batch.iter_column_as_string(0); + let values = batch.iter_column_as_string(1); + keys.zip(values) + .filter_map(|(k, v)| match (k, v) { + (Some(k), Some(v)) => Some((k, v)), + (Some(k), None) => Some((k, "".to_string())), + (None, _) => None, }) - .collect() + .collect::>() } /// Decode a record batch to a list of key. fn decode_record_batch_to_key(batch: RecordBatch) -> Vec { - let key_col = batch.column(0); - - (0..batch.num_rows()) - .flat_map(move |row_index| { - key_col - .get_ref(row_index) - .try_into_string() - .unwrap() - .map(|s| s.to_string()) - }) - .collect() + batch.iter_column_as_string(0).flatten().collect::>() } // simulate to `KvBackend` @@ -590,6 +565,8 @@ impl MetadataRegion { /// Retrieves the value associated with the given key in the specified region. /// Returns `Ok(None)` if the key is not found. pub async fn get(&self, region_id: RegionId, key: &str) -> Result> { + use datatypes::arrow::array::{Array, AsArray}; + let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME) .eq(datafusion::prelude::lit(key)); @@ -611,12 +588,9 @@ impl MetadataRegion { return Ok(None); }; - let val = first_batch - .column(0) - .get_ref(0) - .try_into_string() - .unwrap() - .map(|s| s.to_string()); + let column = first_batch.column(0); + let column = column.as_string::(); + let val = column.is_valid(0).then(|| column.value(0).to_string()); Ok(val) } diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 0e91c542a0..09fc4e2935 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -19,8 +19,8 @@ use std::time::Duration; use api::v1::{ColumnSchema, Rows}; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; -use datatypes::prelude::ScalarVector; -use datatypes::vectors::TimestampMillisecondVector; +use datatypes::arrow::array::AsArray; +use datatypes::arrow::datatypes::TimestampMillisecondType; use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::AlterKind::SetRegionOptions; use store_api::region_request::{ @@ -125,10 +125,8 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec { let ts_col = batch .column_by_name("ts") .unwrap() - .as_any() - .downcast_ref::() - .unwrap(); - res.extend(ts_col.iter_data().map(|t| t.unwrap().0.value())); + .as_primitive::(); + res.extend((0..ts_col.len()).map(|i| ts_col.value(i))); } res } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index b51f9f5fcc..e621c77e36 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -109,7 +109,10 @@ impl ConvertBatchStream { compute::concat_batches(output_schema.arrow_schema(), &self.buffer) .context(ArrowComputeSnafu)?; - RecordBatch::try_from_df_record_batch(output_schema, record_batch) + Ok(RecordBatch::from_df_record_batch( + output_schema, + record_batch, + )) } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index ad9a8c4ac5..d3e63e634e 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -19,6 +19,8 @@ use api::v1::{ ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, }; +use arrow::array::{Array, AsArray}; +use arrow::datatypes::TimestampNanosecondType; use common_query::OutputData; use common_recordbatch::util as record_util; use common_telemetry::{debug, info}; @@ -27,9 +29,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::col; use datafusion_common::TableReference; use datafusion_expr::{DmlStatement, LogicalPlan}; -use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; -use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; use itertools::Itertools; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; @@ -527,8 +527,7 @@ impl PipelineTable { for r in records { let pipeline_content_column = r.column(0); let pipeline_content = pipeline_content_column - .as_any() - .downcast_ref::() + .as_string_opt::() .with_context(|| CastTypeSnafu { msg: format!( "can't downcast {:?} array into string vector", @@ -537,20 +536,19 @@ impl PipelineTable { })?; let pipeline_schema_column = r.column(1); - let pipeline_schema = pipeline_schema_column - .as_any() - .downcast_ref::() - .with_context(|| CastTypeSnafu { - msg: format!( - "can't downcast {:?} array into string vector", - pipeline_schema_column.data_type() - ), - })?; + let pipeline_schema = + pipeline_schema_column + .as_string_opt::() + .with_context(|| CastTypeSnafu { + msg: format!( + "expecting pipeline schema column of type string, actual: {}", + pipeline_schema_column.data_type() + ), + })?; let pipeline_created_at_column = r.column(2); let pipeline_created_at = pipeline_created_at_column - .as_any() - .downcast_ref::() + .as_primitive_opt::() .with_context(|| CastTypeSnafu { msg: format!( "can't downcast {:?} array into scalar vector", @@ -572,9 +570,9 @@ impl PipelineTable { let len = pipeline_content.len(); for i in 0..len { re.push(( - pipeline_content.get_data(i).unwrap().to_string(), - pipeline_schema.get_data(i).unwrap().to_string(), - pipeline_created_at.get_data(i).unwrap(), + pipeline_content.value(i).to_string(), + pipeline_schema.value(i).to_string(), + TimestampNanosecond::new(pipeline_created_at.value(i)), )); } } diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs index e80d4a7676..369754899f 100644 --- a/src/promql/src/extension_plan/histogram_fold.rs +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::task::Poll; use std::time::Instant; -use common_recordbatch::RecordBatch as GtRecordBatch; use common_telemetry::warn; use datafusion::arrow::array::AsArray; use datafusion::arrow::compute::{self, SortOptions, concat_batches}; @@ -41,9 +40,8 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::{Column, Expr}; use datatypes::prelude::{ConcreteDataType, DataType as GtDataType}; -use datatypes::schema::Schema as GtSchema; use datatypes::value::{OrderedF64, ValueRef}; -use datatypes::vectors::MutableVector; +use datatypes::vectors::{Helper, MutableVector}; use futures::{Stream, StreamExt, ready}; /// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later @@ -560,36 +558,29 @@ impl HistogramFoldStream { let mut remaining_rows = self.input_buffered_rows; let mut cursor = 0; - let gt_schema = GtSchema::try_from(self.input.schema()).unwrap(); - let batch = GtRecordBatch::try_from_df_record_batch(Arc::new(gt_schema), batch).unwrap(); + // TODO(LFC): Try to get rid of the Arrow array to vector conversion here. + let vectors = Helper::try_into_vectors(batch.columns()) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; while remaining_rows >= bucket_num { // "sample" normal columns for normal_index in &self.normal_indices { - let val = batch.column(*normal_index).get(cursor); + let val = vectors[*normal_index].get(cursor); self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref()); } // "fold" `le` and field columns let le_array = batch.column(self.le_column_index); + let le_array = le_array.as_string::(); let field_array = batch.column(self.field_column_index); + let field_array = field_array.as_primitive::(); let mut bucket = vec![]; let mut counters = vec![]; for bias in 0..bucket_num { - let le_str_val = le_array.get(cursor + bias); - let le_str_val_ref = le_str_val.as_value_ref(); - let le_str = le_str_val_ref - .try_into_string() - .unwrap() - .expect("le column should not be nullable"); + let le_str = le_array.value(cursor + bias); let le = le_str.parse::().unwrap(); bucket.push(le); - let counter = field_array - .get(cursor + bias) - .as_value_ref() - .try_into_f64() - .unwrap() - .expect("field column should not be nullable"); + let counter = field_array.value(cursor + bias); counters.push(counter); } // ignore invalid data @@ -600,7 +591,7 @@ impl HistogramFoldStream { self.output_buffered_rows += 1; } - let remaining_input_batch = batch.into_df_record_batch().slice(cursor, remaining_rows); + let remaining_input_batch = batch.slice(cursor, remaining_rows); self.input_buffered_rows = remaining_input_batch.num_rows(); self.input_buffer.push(remaining_input_batch); diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index ab27666c01..f22a209de7 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -682,13 +682,14 @@ impl QueryExecutor for DatafusionQueryEngine { mod tests { use std::sync::Arc; + use arrow::array::{ArrayRef, UInt64Array}; use catalog::RegisterTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_recordbatch::util; use datafusion::prelude::{col, lit}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; - use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef}; + use datatypes::vectors::{Helper, UInt32Vector, VectorRef}; use session::context::{QueryContext, QueryContextBuilder}; use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable}; @@ -770,10 +771,8 @@ mod tests { assert_eq!(1, batch.num_columns()); assert_eq!(batch.column(0).len(), 1); - assert_eq!( - *batch.column(0), - Arc::new(UInt64Vector::from_slice([4950])) as VectorRef - ); + let expected = Arc::new(UInt64Array::from_iter_values([4950])) as ArrayRef; + assert_eq!(batch.column(0), &expected); } _ => unreachable!(), } diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 693b1aa068..f0bd9df1bc 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -1440,8 +1440,7 @@ mod test { .. }) => { let record = record.take().first().cloned().unwrap(); - let data = record.column(0); - Ok(data.get(0).to_string()) + Ok(record.iter_column_as_string(0).next().unwrap().unwrap()) } Ok(_) => unreachable!(), Err(e) => Err(e), diff --git a/src/query/src/tests/function.rs b/src/query/src/tests/function.rs index b383daf521..9f6ce0137e 100644 --- a/src/query/src/tests/function.rs +++ b/src/query/src/tests/function.rs @@ -18,7 +18,7 @@ use common_function::scalars::vector::impl_conv::veclit_to_binlit; use common_recordbatch::RecordBatch; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; -use datatypes::vectors::BinaryVector; +use datatypes::vectors::{BinaryVector, Helper}; use rand::Rng; use table::test_util::MemTable; @@ -64,5 +64,6 @@ pub fn get_value_from_batches(column_name: &str, batches: Vec) -> V assert_eq!(batch.column(0).len(), 1); let v = batch.column(0); assert_eq!(1, v.len()); + let v = Helper::try_into_vector(v).unwrap(); v.get(0) } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 797d2cf26a..a96abc36d7 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use arrow::array::{ArrayRef, UInt32Array}; use catalog::RegisterTableRequest; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; @@ -97,11 +98,10 @@ async fn test_datafusion_query_engine() -> Result<()> { let batch = &numbers[0]; assert_eq!(1, batch.num_columns()); assert_eq!(batch.column(0).len(), limit); - let expected: Vec = (0u32..limit as u32).collect(); - assert_eq!( - *batch.column(0), - Arc::new(UInt32Vector::from_slice(expected)) as VectorRef - ); + let expected = Arc::new(UInt32Array::from_iter_values( + (0u32..limit as u32).collect::>(), + )) as ArrayRef; + assert_eq!(batch.column(0), &expected); Ok(()) } diff --git a/src/query/src/tests/vec_avg_test.rs b/src/query/src/tests/vec_avg_test.rs index 46bb3528a9..672cbeaa27 100644 --- a/src/query/src/tests/vec_avg_test.rs +++ b/src/query/src/tests/vec_avg_test.rs @@ -34,7 +34,7 @@ async fn test_vec_avg_aggregator() -> Result<(), common_query::error::Error> { let sql = "SELECT vector FROM vectors"; let vectors = exec_selection(engine, sql).await; - let column = vectors[0].column(0).to_arrow_array(); + let column = vectors[0].column(0); let len = column.len(); for i in 0..column.len() { let v = ScalarValue::try_from_array(&column, i)?; diff --git a/src/query/src/tests/vec_product_test.rs b/src/query/src/tests/vec_product_test.rs index 53eb0d3272..26c275a5cc 100644 --- a/src/query/src/tests/vec_product_test.rs +++ b/src/query/src/tests/vec_product_test.rs @@ -32,7 +32,7 @@ async fn test_vec_product_aggregator() -> Result<(), common_query::error::Error> let sql = "SELECT vector FROM vectors"; let vectors = exec_selection(engine, sql).await; - let column = vectors[0].column(0).to_arrow_array(); + let column = vectors[0].column(0); for i in 0..column.len() { let v = ScalarValue::try_from_array(&column, i)?; let vector = as_veclit(&v)?; diff --git a/src/query/src/tests/vec_sum_test.rs b/src/query/src/tests/vec_sum_test.rs index 2c488c3c53..389bb0724d 100644 --- a/src/query/src/tests/vec_sum_test.rs +++ b/src/query/src/tests/vec_sum_test.rs @@ -34,7 +34,7 @@ async fn test_vec_sum_aggregator() -> Result<(), common_query::error::Error> { let sql = "SELECT vector FROM vectors"; let vectors = exec_selection(engine, sql).await; - let column = vectors[0].column(0).to_arrow_array(); + let column = vectors[0].column(0); for i in 0..column.len() { let v = ScalarValue::try_from_array(&column, i)?; let vector = as_veclit(&v)?; diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 281ac108d0..d460c905f3 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -32,9 +32,7 @@ use common_telemetry::{debug, error, info}; use common_time::Timestamp; use common_time::timestamp::TimeUnit; use datatypes::data_type::DataType; -use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; -use datatypes::types::jsonb_to_serde_json; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use http::{HeaderValue, Method}; @@ -55,8 +53,7 @@ use self::result::table_result::TableResponse; use crate::configurator::ConfiguratorRef; use crate::elasticsearch; use crate::error::{ - AddressBindSnafu, AlreadyStartedSnafu, ConvertSqlValueSnafu, Error, InternalIoSnafu, - InvalidHeaderValueSnafu, Result, ToJsonSnafu, + AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result, }; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::otlp::OtlpState; @@ -109,6 +106,7 @@ pub mod result; mod timeout; pub mod utils; +use result::HttpOutputWriter; pub(crate) use timeout::DynamicTimeoutLayer; mod hints; @@ -298,30 +296,10 @@ impl HttpRecordsOutput { } else { let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::(); let mut rows = Vec::with_capacity(num_rows); - let schemas = schema.column_schemas(); - let num_cols = schema.column_schemas().len(); - rows.resize_with(num_rows, || Vec::with_capacity(num_cols)); - let mut finished_row_cursor = 0; for recordbatch in recordbatches { - for (col_idx, col) in recordbatch.columns().iter().enumerate() { - // safety here: schemas length is equal to the number of columns in the recordbatch - let schema = &schemas[col_idx]; - for row_idx in 0..recordbatch.num_rows() { - let value = col.get(row_idx); - // TODO(sunng87): is this duplicated with `map_json_type_to_string` in recordbatch? - let value = if let ConcreteDataType::Json(_json_type) = &schema.data_type - && let datatypes::value::Value::Binary(bytes) = value - { - jsonb_to_serde_json(bytes.as_ref()).context(ConvertSqlValueSnafu)? - } else { - serde_json::Value::try_from(col.get(row_idx)).context(ToJsonSnafu)? - }; - - rows[row_idx + finished_row_cursor].push(value); - } - } - finished_row_cursor += recordbatch.num_rows(); + let mut writer = HttpOutputWriter::new(schema.num_columns(), None); + writer.write(recordbatch, &mut rows)?; } Ok(HttpRecordsOutput { diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index f9d1e1c21b..26a91d51fa 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -19,16 +19,13 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; -use arrow::array::AsArray; +use arrow::array::{Array, AsArray}; use arrow::datatypes::{ - Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, - DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, + Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; -use arrow_schema::{DataType, IntervalUnit, TimeUnit}; +use arrow_schema::{DataType, IntervalUnit}; use axum::extract::{Path, Query, State}; use axum::{Extension, Form}; use catalog::CatalogManagerRef; @@ -39,18 +36,13 @@ use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_telemetry::{debug, tracing}; -use common_time::time::Time; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; -use common_time::{ - Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, -}; +use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth}; use common_version::OwnedBuildInfo; use datafusion_common::ScalarValue; use datatypes::prelude::ConcreteDataType; -use datatypes::scalars::ScalarVector; use datatypes::schema::{ColumnSchema, SchemaRef}; use datatypes::types::jsonb_to_string; -use datatypes::vectors::Float64Vector; use futures::StreamExt; use futures::future::join_all; use itertools::Itertools; @@ -950,47 +942,12 @@ impl RowWriter { let v = Date::new((array.value(i) / 86_400_000) as i32); self.insert(column, v); } - DataType::Timestamp(time_unit, _) => { - let v = match time_unit { - TimeUnit::Second => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = array.as_primitive::(); - array.value(i) - } - }; - let v = Timestamp::new(v, time_unit.into()); + DataType::Timestamp(_, _) => { + let v = datatypes::arrow_array::timestamp_array_value(array, i); self.insert(column, v.to_iso8601_string()); } - DataType::Time32(time_unit) | DataType::Time64(time_unit) => { - let v = match time_unit { - TimeUnit::Second => { - let array = array.as_primitive::(); - Time::new_second(array.value(i) as i64) - } - TimeUnit::Millisecond => { - let array = array.as_primitive::(); - Time::new_millisecond(array.value(i) as i64) - } - TimeUnit::Microsecond => { - let array = array.as_primitive::(); - Time::new_microsecond(array.value(i)) - } - TimeUnit::Nanosecond => { - let array = array.as_primitive::(); - Time::new_nanosecond(array.value(i)) - } - }; + DataType::Time32(_) | DataType::Time64(_) => { + let v = datatypes::arrow_array::time_array_value(array, i); self.insert(column, v.to_iso8601_string()); } DataType::Interval(interval_unit) => match interval_unit { @@ -1010,26 +967,8 @@ impl RowWriter { self.insert(column, v.to_iso8601_string()); } }, - DataType::Duration(time_unit) => { - let v = match time_unit { - TimeUnit::Second => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = array.as_primitive::(); - array.value(i) - } - }; - let d = Duration::new(v, time_unit.into()); + DataType::Duration(_) => { + let d = datatypes::arrow_array::duration_array_value(array, i); self.insert(column, d); } DataType::List(_) => { @@ -1134,20 +1073,14 @@ fn record_batches_to_labels_name( let field_columns = field_column_indices .iter() .map(|i| { - batch - .column(*i) - .as_any() - .downcast_ref::() - .unwrap() + let column = batch.column(*i); + column.as_primitive::() }) .collect::>(); for row_index in 0..batch.num_rows() { // if all field columns are null, skip this row - if field_columns - .iter() - .all(|c| c.get_data(row_index).is_none()) - { + if field_columns.iter().all(|c| c.is_null(row_index)) { continue; } diff --git a/src/servers/src/http/result.rs b/src/servers/src/http/result.rs index dbad6dc3bc..f4466ead8d 100644 --- a/src/servers/src/http/result.rs +++ b/src/servers/src/http/result.rs @@ -12,6 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow::array::AsArray; +use arrow::datatypes::{ + Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, + Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_schema::{DataType, IntervalUnit}; +use common_decimal::Decimal128; +use common_recordbatch::RecordBatch; +use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth}; +use datafusion_common::ScalarValue; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use snafu::ResultExt; + +use crate::error::{ + ConvertScalarValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result, ToJsonSnafu, + UnexpectedResultSnafu, +}; + pub(crate) mod arrow_result; pub(crate) mod csv_result; pub mod error_result; @@ -22,3 +42,240 @@ pub(crate) mod json_result; pub(crate) mod null_result; pub(crate) mod prometheus_resp; pub(crate) mod table_result; + +pub(super) struct HttpOutputWriter { + columns: usize, + value_transformer: Option Value>>, + current: Option>, +} + +impl HttpOutputWriter { + pub(super) fn new( + columns: usize, + value_transformer: Option Value>>, + ) -> Self { + Self { + columns, + value_transformer, + current: None, + } + } + + fn write_bytes(&mut self, bytes: &[u8], datatype: &ConcreteDataType) -> Result<()> { + if datatype.is_json() { + let value = datatypes::types::jsonb_to_serde_json(bytes).map_err(|e| { + UnexpectedResultSnafu { + reason: format!("corrupted jsonb data: {bytes:?}, error: {e}"), + } + .build() + })?; + self.push(value); + Ok(()) + } else { + self.write_value(bytes) + } + } + + fn write_value(&mut self, value: impl Into) -> Result<()> { + let value = value.into(); + + let value = if let Some(f) = &self.value_transformer { + f(value) + } else { + value + }; + + let value = serde_json::Value::try_from(value).context(ToJsonSnafu)?; + self.push(value); + Ok(()) + } + + fn push(&mut self, value: serde_json::Value) { + let current = self + .current + .get_or_insert_with(|| Vec::with_capacity(self.columns)); + current.push(value); + } + + fn finish(&mut self) -> Vec { + self.current.take().unwrap_or_default() + } + + pub(super) fn write( + &mut self, + record_batch: RecordBatch, + rows: &mut Vec>, + ) -> Result<()> { + let schema = record_batch.schema.clone(); + let record_batch = record_batch.into_df_record_batch(); + for i in 0..record_batch.num_rows() { + for (schema, array) in schema + .column_schemas() + .iter() + .zip(record_batch.columns().iter()) + { + if array.is_null(i) { + self.write_value(Value::Null)?; + continue; + } + + match array.data_type() { + DataType::Null => { + self.write_value(Value::Null)?; + } + DataType::Boolean => { + let array = array.as_boolean(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::UInt8 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::UInt16 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::UInt32 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::UInt64 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Int8 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Int16 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Int32 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Int64 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Float32 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Float64 => { + let array = array.as_primitive::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Utf8 => { + let array = array.as_string::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::LargeUtf8 => { + let array = array.as_string::(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Utf8View => { + let array = array.as_string_view(); + let v = array.value(i); + self.write_value(v)?; + } + DataType::Binary => { + let array = array.as_binary::(); + let v = array.value(i); + self.write_bytes(v, &schema.data_type)?; + } + DataType::LargeBinary => { + let array = array.as_binary::(); + let v = array.value(i); + self.write_bytes(v, &schema.data_type)?; + } + DataType::BinaryView => { + let array = array.as_binary_view(); + let v = array.value(i); + self.write_bytes(v, &schema.data_type)?; + } + DataType::Date32 => { + let array = array.as_primitive::(); + let v = Date::new(array.value(i)); + self.write_value(v)?; + } + DataType::Date64 => { + let array = array.as_primitive::(); + // `Date64` values are milliseconds representation of `Date32` values, + // according to its specification. So we convert the `Date64` value here to + // the `Date32` value to process them unified. + let v = Date::new((array.value(i) / 86_400_000) as i32); + self.write_value(v)?; + } + DataType::Timestamp(_, _) => { + let ts = datatypes::arrow_array::timestamp_array_value(array, i); + self.write_value(ts)?; + } + DataType::Time32(_) | DataType::Time64(_) => { + let v = datatypes::arrow_array::time_array_value(array, i); + self.write_value(v)?; + } + DataType::Interval(interval_unit) => match interval_unit { + IntervalUnit::YearMonth => { + let array = array.as_primitive::(); + let v: IntervalYearMonth = array.value(i).into(); + self.write_value(v)?; + } + IntervalUnit::DayTime => { + let array = array.as_primitive::(); + let v: IntervalDayTime = array.value(i).into(); + self.write_value(v)?; + } + IntervalUnit::MonthDayNano => { + let array = array.as_primitive::(); + let v: IntervalMonthDayNano = array.value(i).into(); + self.write_value(v)?; + } + }, + DataType::Duration(_) => { + let d = datatypes::arrow_array::duration_array_value(array, i); + self.write_value(d)?; + } + DataType::List(_) => { + let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?; + let v: Value = v.try_into().context(ConvertScalarValueSnafu)?; + self.write_value(v)?; + } + DataType::Struct(_) => { + let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?; + let v: Value = v.try_into().context(ConvertScalarValueSnafu)?; + self.write_value(v)?; + } + DataType::Decimal128(precision, scale) => { + let array = array.as_primitive::(); + let v = Decimal128::new(array.value(i), *precision, *scale); + self.write_value(v)?; + } + _ => { + return NotSupportedSnafu { + feat: format!("convert {} to http output value", array.data_type()), + } + .fail(); + } + } + } + + rows.push(self.finish()) + } + Ok(()) + } +} diff --git a/src/servers/src/http/result/influxdb_result_v1.rs b/src/servers/src/http/result/influxdb_result_v1.rs index e5f11d8aba..cc374aba70 100644 --- a/src/servers/src/http/result/influxdb_result_v1.rs +++ b/src/servers/src/http/result/influxdb_result_v1.rs @@ -12,35 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow::array::AsArray; -use arrow::datatypes::{ - Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, - DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, - Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, -}; -use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use axum::Json; use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; -use common_decimal::Decimal128; use common_query::{Output, OutputData}; use common_recordbatch::{RecordBatch, util}; -use common_time::time::Time; -use common_time::{ - Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, -}; -use datafusion_common::ScalarValue; use serde::{Deserialize, Serialize}; use serde_json::Value; -use snafu::ResultExt; -use crate::error::{ - ConvertScalarValueSnafu, DataFusionSnafu, Error, NotSupportedSnafu, Result, ToJsonSnafu, -}; +use crate::error::{Error, Result}; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::result::HttpOutputWriter; use crate::http::result::error_result::ErrorResponse; use crate::http::{Epoch, HttpResponse, ResponseFormat}; @@ -84,8 +66,8 @@ impl TryFrom<(Option, Vec)> for InfluxdbRecordsOutput { } else { // Safety: ensured by previous empty check let first = &recordbatches[0]; - let columns = first - .schema + let schema = first.schema.clone(); + let columns = schema .column_schemas() .iter() .map(|cs| cs.name.clone()) @@ -94,8 +76,23 @@ impl TryFrom<(Option, Vec)> for InfluxdbRecordsOutput { let mut rows = Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::()); + let value_transformer = + move |value: datatypes::value::Value| -> datatypes::value::Value { + match (value, epoch) { + (datatypes::value::Value::Timestamp(ts), Some(epoch)) => { + if let Some(converted) = epoch.convert_timestamp(ts) { + datatypes::value::Value::Timestamp(converted) + } else { + datatypes::value::Value::Timestamp(ts) + } + } + (value, _) => value, + } + }; + for recordbatch in recordbatches { - let mut writer = RowWriter::new(epoch, recordbatch.num_columns()); + let mut writer = + HttpOutputWriter::new(schema.num_columns(), Some(Box::new(value_transformer))); writer.write(recordbatch, &mut rows)?; } @@ -104,266 +101,6 @@ impl TryFrom<(Option, Vec)> for InfluxdbRecordsOutput { } } -struct RowWriter { - epoch: Option, - columns: usize, - current: Option>, -} - -impl RowWriter { - fn new(epoch: Option, columns: usize) -> Self { - Self { - epoch, - columns, - current: None, - } - } - - fn push(&mut self, value: impl Into) -> Result<()> { - let value = value.into(); - - let current = self - .current - .get_or_insert_with(|| Vec::with_capacity(self.columns)); - let value = Value::try_from(value).context(ToJsonSnafu)?; - current.push(value); - Ok(()) - } - - fn finish(&mut self) -> Vec { - self.current.take().unwrap_or_default() - } - - fn write(&mut self, record_batch: RecordBatch, rows: &mut Vec>) -> Result<()> { - let record_batch = record_batch.into_df_record_batch(); - for i in 0..record_batch.num_rows() { - for array in record_batch.columns().iter() { - if array.is_null(i) { - self.push(datatypes::value::Value::Null)?; - continue; - } - - match array.data_type() { - DataType::Null => { - self.push(datatypes::value::Value::Null)?; - } - DataType::Boolean => { - let array = array.as_boolean(); - let v = array.value(i); - self.push(v)?; - } - DataType::UInt8 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::UInt16 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::UInt32 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::UInt64 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Int8 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Int16 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Int32 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Int64 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Float32 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Float64 => { - let array = array.as_primitive::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Utf8 => { - let array = array.as_string::(); - let v = array.value(i); - self.push(v)?; - } - DataType::LargeUtf8 => { - let array = array.as_string::(); - let v = array.value(i); - self.push(v)?; - } - DataType::Utf8View => { - let array = array.as_string_view(); - let v = array.value(i); - self.push(v)?; - } - DataType::Binary => { - let array = array.as_binary::(); - let v = array.value(i); - self.push(v)?; - } - DataType::LargeBinary => { - let array = array.as_binary::(); - let v = array.value(i); - self.push(v)?; - } - DataType::BinaryView => { - let array = array.as_binary_view(); - let v = array.value(i); - self.push(v)?; - } - DataType::Date32 => { - let array = array.as_primitive::(); - let v = Date::new(array.value(i)); - self.push(v)?; - } - DataType::Date64 => { - let array = array.as_primitive::(); - // `Date64` values are milliseconds representation of `Date32` values, - // according to its specification. So we convert the `Date64` value here to - // the `Date32` value to process them unified. - let v = Date::new((array.value(i) / 86_400_000) as i32); - self.push(v)?; - } - DataType::Timestamp(time_unit, _) => { - let v = match time_unit { - TimeUnit::Second => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = array.as_primitive::(); - array.value(i) - } - }; - let mut ts = Timestamp::new(v, time_unit.into()); - if let Some(epoch) = self.epoch - && let Some(converted) = epoch.convert_timestamp(ts) - { - ts = converted; - } - self.push(ts)?; - } - DataType::Time32(time_unit) | DataType::Time64(time_unit) => { - let v = match time_unit { - TimeUnit::Second => { - let array = array.as_primitive::(); - Time::new_second(array.value(i) as i64) - } - TimeUnit::Millisecond => { - let array = array.as_primitive::(); - Time::new_millisecond(array.value(i) as i64) - } - TimeUnit::Microsecond => { - let array = array.as_primitive::(); - Time::new_microsecond(array.value(i)) - } - TimeUnit::Nanosecond => { - let array = array.as_primitive::(); - Time::new_nanosecond(array.value(i)) - } - }; - self.push(v)?; - } - DataType::Interval(interval_unit) => match interval_unit { - IntervalUnit::YearMonth => { - let array = array.as_primitive::(); - let v: IntervalYearMonth = array.value(i).into(); - self.push(v)?; - } - IntervalUnit::DayTime => { - let array = array.as_primitive::(); - let v: IntervalDayTime = array.value(i).into(); - self.push(v)?; - } - IntervalUnit::MonthDayNano => { - let array = array.as_primitive::(); - let v: IntervalMonthDayNano = array.value(i).into(); - self.push(v)?; - } - }, - DataType::Duration(time_unit) => { - let v = match time_unit { - TimeUnit::Second => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = array.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = array.as_primitive::(); - array.value(i) - } - }; - let d = Duration::new(v, time_unit.into()); - self.push(d)?; - } - DataType::List(_) => { - let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?; - let v: datatypes::value::Value = - v.try_into().context(ConvertScalarValueSnafu)?; - self.push(v)?; - } - DataType::Struct(_) => { - let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?; - let v: datatypes::value::Value = - v.try_into().context(ConvertScalarValueSnafu)?; - self.push(v)?; - } - DataType::Decimal128(precision, scale) => { - let array = array.as_primitive::(); - let v = Decimal128::new(array.value(i), *precision, *scale); - self.push(v)?; - } - _ => { - return NotSupportedSnafu { - feat: format!("convert {} to influxdb value", array.data_type()), - } - .fail(); - } - } - } - - rows.push(self.finish()) - } - Ok(()) - } -} - #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub struct InfluxdbOutput { pub statement_id: u32, diff --git a/src/servers/src/http/result/prometheus_resp.rs b/src/servers/src/http/result/prometheus_resp.rs index 4bf386bbfd..9ecbe671b4 100644 --- a/src/servers/src/http/result/prometheus_resp.rs +++ b/src/servers/src/http/result/prometheus_resp.rs @@ -16,6 +16,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; +use arrow::array::{Array, AsArray}; +use arrow::datatypes::{Float64Type, TimestampMillisecondType}; +use arrow_schema::DataType; use axum::Json; use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; @@ -24,8 +27,6 @@ use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; -use datatypes::scalars::ScalarVector; -use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; use indexmap::IndexMap; use promql_parser::label::METRIC_NAME; use promql_parser::parser::value::ValueType; @@ -34,7 +35,7 @@ use serde_json::Value; use snafu::{OptionExt, ResultExt}; use crate::error::{ - CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, status_code_to_http_status, + ArrowSnafu, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, status_code_to_http_status, }; use crate::http::header::{GREPTIME_DB_HEADER_METRICS, collect_plan_metrics}; use crate::http::prometheus::{ @@ -247,13 +248,7 @@ impl PrometheusJsonResponse { // prepare things... let tag_columns = tag_column_indices .iter() - .map(|i| { - batch - .column(*i) - .as_any() - .downcast_ref::() - .unwrap() - }) + .map(|i| batch.column(*i).as_string::()) .collect::>(); let tag_names = tag_column_indices .iter() @@ -261,22 +256,18 @@ impl PrometheusJsonResponse { .collect::>(); let timestamp_column = batch .column(timestamp_column_index) - .as_any() - .downcast_ref::() - .unwrap(); - let casted_field_column = batch - .column(first_field_column_index) - .cast(&ConcreteDataType::float64_datatype()) - .unwrap(); - let field_column = casted_field_column - .as_any() - .downcast_ref::() - .unwrap(); + .as_primitive::(); + + let array = + arrow::compute::cast(batch.column(first_field_column_index), &DataType::Float64) + .context(ArrowSnafu)?; + let field_column = array.as_primitive::(); // assemble rows for row_index in 0..batch.num_rows() { // retrieve value - if let Some(v) = field_column.get_data(row_index) { + if field_column.is_valid(row_index) { + let v = field_column.value(row_index); // ignore all NaN values to reduce the amount of data to be sent. if v.is_nan() { continue; @@ -289,14 +280,13 @@ impl PrometheusJsonResponse { } for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { // TODO(ruihang): add test for NULL tag - if let Some(tag_value) = tag_column.get_data(row_index) { - tags.push((tag_name, tag_value)); + if tag_column.is_valid(row_index) { + tags.push((tag_name, tag_column.value(row_index))); } } // retrieve timestamp - let timestamp_millis: i64 = - timestamp_column.get_data(row_index).unwrap().into(); + let timestamp_millis = timestamp_column.value(row_index); let timestamp = timestamp_millis as f64 / 1000.0; buffer diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 1af8f1db88..3013aee4e5 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -16,6 +16,7 @@ #![feature(try_blocks)] #![feature(exclusive_wrapper)] #![feature(if_let_guard)] +#![feature(box_patterns)] use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 8475f0ac65..fd80d751c8 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -16,22 +16,18 @@ use std::time::Duration; use arrow::array::{Array, AsArray}; use arrow::datatypes::{ - Date32Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, - DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, - Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, + Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, + Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type, + UInt16Type, UInt32Type, UInt64Type, }; -use arrow_schema::{DataType, IntervalUnit, TimeUnit}; +use arrow_schema::{DataType, IntervalUnit}; use common_decimal::Decimal128; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::{debug, error}; -use common_time::time::Time; -use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp}; +use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth}; use datafusion_common::ScalarValue; use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; @@ -312,26 +308,8 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { let v = Date::new(array.value(i)); row_writer.write_col(v.to_chrono_date())?; } - DataType::Timestamp(time_unit, _) => { - let v = match time_unit { - TimeUnit::Second => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = column.as_primitive::(); - array.value(i) - } - }; - let v = Timestamp::new(v, time_unit.into()); + DataType::Timestamp(_, _) => { + let v = datatypes::arrow_array::timestamp_array_value(column, i); let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())); row_writer.write_col(v)?; } @@ -352,28 +330,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { row_writer.write_col(v.to_iso8601_string())?; } }, - DataType::Duration(time_unit) => match time_unit { - TimeUnit::Second => { - let array = column.as_primitive::(); - let v = array.value(i); - row_writer.write_col(Duration::from_secs(v as u64))?; - } - TimeUnit::Millisecond => { - let array = column.as_primitive::(); - let v = array.value(i); - row_writer.write_col(Duration::from_millis(v as u64))?; - } - TimeUnit::Microsecond => { - let array = column.as_primitive::(); - let v = array.value(i); - row_writer.write_col(Duration::from_micros(v as u64))?; - } - TimeUnit::Nanosecond => { - let array = column.as_primitive::(); - let v = array.value(i); - row_writer.write_col(Duration::from_nanos(v as u64))?; - } - }, + DataType::Duration(_) => { + let v: Duration = + datatypes::arrow_array::duration_array_value(column, i).into(); + row_writer.write_col(v)?; + } DataType::List(_) => { let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?; row_writer.write_col(v.to_string())?; @@ -382,37 +343,8 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?; row_writer.write_col(v.to_string())?; } - DataType::Time32(time_unit) => { - let time = match time_unit { - TimeUnit::Second => { - let array = column.as_primitive::(); - Time::new_second(array.value(i) as i64) - } - TimeUnit::Millisecond => { - let array = column.as_primitive::(); - Time::new_millisecond(array.value(i) as i64) - } - _ => unreachable!( - "`DataType::Time32` has only second and millisecond time units" - ), - }; - let v = time.to_timezone_aware_string(Some(&query_context.timezone())); - row_writer.write_col(v)?; - } - DataType::Time64(time_unit) => { - let time = match time_unit { - TimeUnit::Microsecond => { - let array = column.as_primitive::(); - Time::new_microsecond(array.value(i)) - } - TimeUnit::Nanosecond => { - let array = column.as_primitive::(); - Time::new_nanosecond(array.value(i)) - } - _ => unreachable!( - "`DataType::Time64` has only microsecond and nanosecond time units" - ), - }; + DataType::Time32(_) | DataType::Time64(_) => { + let time = datatypes::arrow_array::time_array_value(column, i); let v = time.to_timezone_aware_string(Some(&query_context.timezone())); row_writer.write_col(v)?; } diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 88f5add4bc..b516410328 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -22,8 +22,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, AsArray}; use arrow::datatypes::{ - Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, - DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, + Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, @@ -34,9 +33,7 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; use common_decimal::Decimal128; use common_recordbatch::RecordBatch; use common_time::time::Time; -use common_time::{ - Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, -}; +use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp}; use datafusion_common::ScalarValue; use datafusion_expr::LogicalPlan; use datatypes::arrow::datatypes::DataType as ArrowDataType; @@ -567,26 +564,8 @@ impl RecordBatchRowIterator { }); encoder.encode_field(&date)?; } - DataType::Timestamp(time_unit, _) => { - let v = match time_unit { - TimeUnit::Second => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = column.as_primitive::(); - array.value(i) - } - }; - let v = Timestamp::new(v, time_unit.into()); + DataType::Timestamp(_, _) => { + let v = datatypes::arrow_array::timestamp_array_value(column, i); let datetime = v .to_chrono_datetime_with_timezone(Some(&self.query_ctx.timezone())) .map(|v| { @@ -613,26 +592,8 @@ impl RecordBatchRowIterator { encoder.encode_field(&PgInterval::from(v))?; } }, - DataType::Duration(time_unit) => { - let v = match time_unit { - TimeUnit::Second => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Millisecond => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Microsecond => { - let array = column.as_primitive::(); - array.value(i) - } - TimeUnit::Nanosecond => { - let array = column.as_primitive::(); - array.value(i) - } - }; - let d = Duration::new(v, time_unit.into()); + DataType::Duration(_) => { + let d = datatypes::arrow_array::duration_array_value(column, i); match PgInterval::try_from(d) { Ok(i) => encoder.encode_field(&i)?, Err(e) => { @@ -650,25 +611,8 @@ impl RecordBatchRowIterator { DataType::Struct(_) => { encode_struct(&self.query_ctx, Default::default(), encoder)?; } - DataType::Time32(time_unit) | DataType::Time64(time_unit) => { - let v = match time_unit { - TimeUnit::Second => { - let array = column.as_primitive::(); - Time::new_second(array.value(i) as i64) - } - TimeUnit::Millisecond => { - let array = column.as_primitive::(); - Time::new_millisecond(array.value(i) as i64) - } - TimeUnit::Microsecond => { - let array = column.as_primitive::(); - Time::new_microsecond(array.value(i)) - } - TimeUnit::Nanosecond => { - let array = column.as_primitive::(); - Time::new_nanosecond(array.value(i)) - } - }; + DataType::Time32(_) | DataType::Time64(_) => { + let v = datatypes::arrow_array::time_array_value(column, i); encoder.encode_field(&v.to_chrono_time())?; } DataType::Decimal128(precision, scale) => { diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 81268d8663..24e8512ad0 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -21,18 +21,18 @@ use std::hash::{Hash, Hasher}; use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest}; use api::v1::RowInsertRequests; +use arrow::array::{Array, AsArray}; +use arrow::datatypes::{Float64Type, TimestampMillisecondType}; use common_grpc::precision::Precision; use common_query::prelude::{greptime_timestamp, greptime_value}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_telemetry::tracing; -use common_time::timestamp::TimeUnit; use datafusion::prelude::{Expr, col, lit, regexp_match}; use datafusion_common::ScalarValue; use datafusion_expr::LogicalPlan; -use datatypes::prelude::{ConcreteDataType, Value}; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use query::dataframe::DataFrame; -use snafu::{OptionExt, ResultExt, ensure}; +use snafu::{OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; use crate::error::{self, Result}; @@ -233,6 +233,24 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec>(), + ) + }) + .collect::>(); + for row in 0..row_count { let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1); labels.push(new_label( @@ -240,20 +258,10 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec Result() + .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu { msg: format!( "Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}", ts_column.data_type() - ) - } - ); + ), + })?; let field_column = recordbatch.column_by_name(greptime_value()).context( error::InvalidPromRemoteReadQueryResultSnafu { msg: "missing greptime_value column in query result", }, )?; - ensure!( - field_column.data_type() == ConcreteDataType::float64_datatype(), - error::InvalidPromRemoteReadQueryResultSnafu { + let field_column = field_column + .as_primitive_opt::() + .with_context(|| error::InvalidPromRemoteReadQueryResultSnafu { msg: format!( "Expect value column of datatype Float64, actual {:?}", field_column.data_type() - ) - } - ); + ), + })?; // First, collect each row's timeseries id let timeseries_ids = collect_timeseries_ids(table, &recordbatch); @@ -322,14 +328,8 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result value.into(), - _ => unreachable!("checked by the \"ensure\" above"), - }; - let timestamp = match ts_column.get(row) { - Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(), - _ => unreachable!("checked by the \"ensure\" above"), - }; + let value = field_column.value(row); + let timestamp = ts_column.value(row); let sample = Sample { value, timestamp }; timeseries.samples.push(sample); @@ -579,6 +579,7 @@ mod tests { use api::prom_store::remote::LabelMatcher; use api::v1::{ColumnDataType, Row, SemanticType}; use datafusion::prelude::SessionContext; + use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; use table::table::adapter::DfTableProviderAdapter; diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 9b1ed125c0..eb67dcfaef 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -166,9 +166,9 @@ impl Stream for NumbersStream { batch = batch.project(projection).unwrap(); } - Poll::Ready(Some(RecordBatch::try_from_df_record_batch( + Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch( self.projected_schema.clone(), batch, - ))) + )))) } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index c67f5431e8..68b12ed154 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -444,14 +444,10 @@ impl Stream for StreamWithMetricWrapper { } match result { Ok(record_batch) => { - let batch_mem_size = record_batch - .columns() - .iter() - .map(|vec_ref| vec_ref.memory_size()) - .sum::(); // we don't record elapsed time here // since it's calling storage api involving I/O ops - this.metric.record_mem_usage(batch_mem_size); + this.metric + .record_mem_usage(record_batch.buffer_memory_size()); this.metric.record_output(record_batch.num_rows()); Poll::Ready(Some(Ok(record_batch.into_df_record_batch()))) } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 224c9e100f..812b5edaf2 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -29,7 +29,7 @@ use snafu::prelude::*; use store_api::data_source::DataSource; use store_api::storage::{RegionNumber, ScanRequest}; -use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu}; +use crate::error::{SchemaConversionSnafu, TableProjectionSnafu}; use crate::metadata::{ FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion, }; @@ -146,17 +146,14 @@ impl DataSource for MemtableDataSource { }; let df_recordbatch = df_recordbatch.slice(0, limit); - let recordbatch = RecordBatch::try_from_df_record_batch( + let recordbatch = RecordBatch::from_df_record_batch( Arc::new( Schema::try_from(df_recordbatch.schema()) .context(SchemaConversionSnafu) .map_err(BoxedError::new)?, ), df_recordbatch, - ) - .map_err(BoxedError::new) - .context(TablesRecordBatchSnafu) - .map_err(BoxedError::new)?; + ); Ok(Box::pin(MemtableStream { schema: recordbatch.schema.clone(), diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs index 053521c86f..ed74525aef 100644 --- a/tests-integration/src/tests/instance_kafka_wal_test.rs +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -18,9 +18,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use client::DEFAULT_CATALOG_NAME; use common_query::{Output, OutputData}; -use datatypes::arrow::array::AsArray; +use datatypes::arrow::array::{ArrayRef, AsArray, TimestampMillisecondArray}; use datatypes::arrow::datatypes::TimestampMillisecondType; -use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; use frontend::instance::Instance; use itertools::Itertools; use rand::Rng; @@ -85,12 +84,10 @@ async fn test_create_database_and_insert_query( OutputData::Stream(s) => { let batches = common_recordbatch::util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); - assert_eq!( - Arc::new(TimestampMillisecondVector::from_vec(vec![ - 1655276557000_i64 - ])) as VectorRef, - *batches[0].column(0) - ); + let expected = Arc::new(TimestampMillisecondArray::from_iter_values(vec![ + 1655276557000_i64, + ])) as ArrayRef; + assert_eq!(batches[0].column(0), &expected); } _ => unreachable!(), } @@ -226,7 +223,7 @@ async fn ensure_data_exists(tables: &[Table], instance: &Arc) { let queried = record_batches .into_iter() .flat_map(|rb| { - let array = rb.column(0).to_arrow_array(); + let array = rb.column(0); let array = array.as_primitive::(); array.iter().flatten().map(|x| x as u64).collect::>() }) diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index a29e468bf6..30c10c38fd 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -22,7 +22,9 @@ use common_query::Output; use common_recordbatch::util; use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir; -use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef}; +use datatypes::arrow::array::{ + ArrayRef, AsArray, StringArray, TimestampMillisecondArray, UInt64Array, +}; use frontend::error::{Error, Result}; use frontend::instance::Instance; use operator::error::Error as OperatorError; @@ -77,12 +79,10 @@ async fn test_create_database_and_insert_query(instance: Arc) OutputData::Stream(s) => { let batches = util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); - assert_eq!( - Arc::new(TimestampMillisecondVector::from_vec(vec![ - 1655276557000_i64 - ])) as VectorRef, - *batches[0].column(0) - ); + let expected = Arc::new(TimestampMillisecondArray::from_iter_values(vec![ + 1655276557000_i64, + ])) as ArrayRef; + assert_eq!(batches[0].column(0), &expected); } _ => unreachable!(), } @@ -210,7 +210,8 @@ async fn test_show_create_external_table(instance: Arc) { // We can't directly test `show create table` by check_output_stream because the location name length depends on the current filesystem. let record_batches = record_batches.iter().collect::>(); let column = record_batches[0].column_by_name("Create Table").unwrap(); - let actual = column.get(0); + let column = column.as_string::(); + let actual = column.value(0); let expect = format!( r#"CREATE EXTERNAL TABLE IF NOT EXISTS "various_type_csv" ( "c_int" BIGINT NULL, @@ -312,14 +313,11 @@ async fn assert_query_result(instance: &Arc, sql: &str, ts: i64, host: let batches = util::collect(s).await.unwrap(); // let columns = batches[0].df_recordbatch.columns(); assert_eq!(2, batches[0].num_columns()); - assert_eq!( - Arc::new(StringVector::from(vec![host])) as VectorRef, - *batches[0].column(0) - ); - assert_eq!( - Arc::new(TimestampMillisecondVector::from_vec(vec![ts])) as VectorRef, - *batches[0].column(1) - ); + let expected = vec![ + Arc::new(StringArray::from_iter_values(vec![host])) as ArrayRef, + Arc::new(TimestampMillisecondArray::from_iter_values(vec![ts])) as ArrayRef, + ]; + assert_eq!(batches[0].columns(), &expected); } _ => unreachable!(), } @@ -446,10 +444,8 @@ async fn test_execute_query(instance: Arc) { assert_eq!(1, numbers[0].num_columns()); assert_eq!(numbers[0].column(0).len(), 1); - assert_eq!( - Arc::new(UInt64Vector::from_vec(vec![4950_u64])) as VectorRef, - *numbers[0].column(0), - ); + let expected = Arc::new(UInt64Array::from_iter_values(vec![4950_u64])) as ArrayRef; + assert_eq!(numbers[0].column(0), &expected); } _ => unreachable!(), } @@ -2175,7 +2171,8 @@ async fn test_custom_storage(instance: Arc) { let record_batches = record_batches.iter().collect::>(); let column = record_batches[0].column_by_name("Create Table").unwrap(); - let actual = column.get(0); + let column = column.as_string::(); + let actual = column.value(0); let expect = if instance.is_distributed_mode() { format!( diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 732f3b77f7..9440b46aeb 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -34,9 +34,8 @@ use common_test_util::temp_dir::create_temp_dir; use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; -use datatypes::prelude::ScalarVector; -use datatypes::value::Value; -use datatypes::vectors::{Helper, UInt64Vector}; +use datatypes::arrow::array::AsArray; +use datatypes::arrow::datatypes::UInt64Type; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use futures::future::BoxFuture; @@ -1189,12 +1188,12 @@ async fn find_region_distribution_by_sql( let mut distribution = RegionDistribution::new(); for batch in recordbatches.take() { - let datanode_ids: &UInt64Vector = - unsafe { Helper::static_cast(batch.column_by_name("datanode_id").unwrap()) }; - let region_ids: &UInt64Vector = - unsafe { Helper::static_cast(batch.column_by_name("region_id").unwrap()) }; + let column = batch.column_by_name("datanode_id").unwrap(); + let datanode_ids = column.as_primitive::(); + let column = batch.column_by_name("region_id").unwrap(); + let region_ids = column.as_primitive::(); - for (datanode_id, region_id) in datanode_ids.iter_data().zip(region_ids.iter_data()) { + for (datanode_id, region_id) in datanode_ids.iter().zip(region_ids.iter()) { let (Some(datanode_id), Some(region_id)) = (datanode_id, region_id) else { unreachable!(); }; @@ -1231,11 +1230,10 @@ async fn trigger_migration_by_sql( info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); - let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else { - unreachable!(); - }; - - procedure_id.as_utf8().to_string() + let record_batch = &recordbatches.take()[0]; + let column = record_batch.column(0); + let column = column.as_string::(); + column.value(0).to_string() } /// Query procedure state by SQL. @@ -1254,11 +1252,10 @@ async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { info!("SQL result:\n {}", recordbatches.pretty_print().unwrap()); - let Value::String(state) = recordbatches.take()[0].column(0).get(0) else { - unreachable!(); - }; - - state.as_utf8().to_string() + let record_batch = &recordbatches.take()[0]; + let column = record_batch.column(0); + let column = column.as_string::(); + column.value(0).to_string() } async fn insert_values(instance: &Arc, ts: u64) -> Vec> { diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index c28347076a..14c8796e52 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -307,13 +307,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { } }); assert_eq!(json, expected_j); - assert_eq!( - vector, - [1.0f32, 2.0, 3.0] - .iter() - .flat_map(|x| x.to_le_bytes()) - .collect::>() - ); + assert_eq!(vector, "[1,2,3]".as_bytes()); } let rows = sqlx::query("select i from demo where i=?")