refactor: remove Vectors from RecordBatch completely (#7184)

* refactor: remove `Vector`s from `RecordBatch` completely

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-11-21 16:53:35 +08:00
committed by GitHub
parent c5173fccfc
commit 4a7c16586b
41 changed files with 762 additions and 932 deletions

View File

@@ -435,10 +435,10 @@ impl Database {
.context(ExternalSnafu)?;
match flight_message {
FlightMessage::RecordBatch(arrow_batch) => {
yield RecordBatch::try_from_df_record_batch(
yield Ok(RecordBatch::from_df_record_batch(
schema_cloned.clone(),
arrow_batch,
)
))
}
FlightMessage::Metrics(_) => {}
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {

View File

@@ -182,10 +182,8 @@ impl RegionRequester {
match flight_message {
FlightMessage::RecordBatch(record_batch) => {
let result_to_yield = RecordBatch::try_from_df_record_batch(
schema_cloned.clone(),
record_batch,
);
let result_to_yield =
RecordBatch::from_df_record_batch(schema_cloned.clone(), record_batch);
// get the next message from the stream. normally it should be a metrics message.
if let Some(next_flight_message_result) = flight_message_stream.next().await
@@ -219,7 +217,7 @@ impl RegionRequester {
stream_ended = true;
}
yield result_to_yield;
yield Ok(result_to_yield);
}
FlightMessage::Metrics(s) => {
// just a branch in case of some metrics message comes after other things.

View File

@@ -52,9 +52,6 @@ pub enum Error {
data_type: ArrowDatatype,
},
#[snafu(display("Failed to downcast vector: {}", err_msg))]
DowncastVector { err_msg: String },
#[snafu(display("Invalid input type: {}", err_msg))]
InvalidInputType {
#[snafu(implicit)]
@@ -209,8 +206,7 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::DowncastVector { .. }
| Error::InvalidInputState { .. }
Error::InvalidInputState { .. }
| Error::ToScalarValue { .. }
| Error::GetScalarVector { .. }
| Error::ArrowCompute { .. }

View File

@@ -314,10 +314,10 @@ impl Stream for RecordBatchStreamAdapter {
metric_collector.record_batch_metrics,
);
}
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch(
self.schema(),
df_record_batch,
)))
))))
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) =

View File

@@ -133,18 +133,6 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to downcast vector of type '{:?}' to type '{:?}'",
from_type,
to_type
))]
DowncastVector {
from_type: ConcreteDataType,
to_type: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error occurs when performing arrow computation"))]
ArrowCompute {
#[snafu(source)]
@@ -217,8 +205,6 @@ impl ErrorExt for Error {
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
Error::DowncastVector { .. } => StatusCode::Unexpected,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
Error::ArrowCompute { .. } => StatusCode::IllegalState,

View File

@@ -30,19 +30,20 @@ use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use common_base::readable_size::ReadableSize;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder};
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::util::pretty;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::scalars::{ScalarVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::types::{JsonFormat, jsonb_to_string};
use datatypes::vectors::{BinaryVector, StringVectorBuilder};
use error::Result;
use futures::task::{Context, Poll};
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{ResultExt, ensure};
use crate::error::NewDfRecordBatchSnafu;
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn name(&self) -> &str {
@@ -92,20 +93,14 @@ pub fn map_json_type_to_string(
mapped_schema: &SchemaRef,
) -> Result<RecordBatch> {
let mut vectors = Vec::with_capacity(original_schema.column_schemas().len());
for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) {
for (vector, schema) in batch.columns().iter().zip(original_schema.column_schemas()) {
if let ConcreteDataType::Json(j) = &schema.data_type {
if matches!(&j.format, JsonFormat::Jsonb) {
let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len());
let binary_vector = vector
.as_any()
.downcast_ref::<BinaryVector>()
.with_context(|| error::DowncastVectorSnafu {
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::binary_datatype(),
})?;
for value in binary_vector.iter_data() {
let mut string_vector_builder = StringBuilder::new();
let binary_vector = vector.as_binary::<i32>();
for value in binary_vector.iter() {
let Some(value) = value else {
string_vector_builder.push(None);
string_vector_builder.append_null();
continue;
};
let string_value =
@@ -113,11 +108,11 @@ pub fn map_json_type_to_string(
from_type: schema.data_type.clone(),
to_type: ConcreteDataType::string_datatype(),
})?;
string_vector_builder.push(Some(string_value.as_str()));
string_vector_builder.append_value(string_value);
}
let string_vector = string_vector_builder.finish();
vectors.push(Arc::new(string_vector) as VectorRef);
vectors.push(Arc::new(string_vector) as ArrayRef);
} else {
vectors.push(vector.clone());
}
@@ -126,7 +121,15 @@ pub fn map_json_type_to_string(
}
}
RecordBatch::new(mapped_schema.clone(), vectors)
let record_batch = datatypes::arrow::record_batch::RecordBatch::try_new(
mapped_schema.arrow_schema().clone(),
vectors,
)
.context(NewDfRecordBatchSnafu)?;
Ok(RecordBatch::from_df_record_batch(
mapped_schema.clone(),
record_batch,
))
}
/// Maps the json type to string in the schema.
@@ -755,11 +758,7 @@ impl Stream for MemoryTrackedStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
let additional = batch
.columns()
.iter()
.map(|c| c.memory_size())
.sum::<usize>();
let additional = batch.buffer_memory_size();
if let Err(e) = self.permit.track(additional, self.total_tracked) {
return Poll::Ready(Some(Err(e)));

View File

@@ -20,7 +20,7 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
use datatypes::arrow::array::RecordBatchOptions;
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::vectors::{Helper, VectorRef};
@@ -30,15 +30,14 @@ use snafu::{OptionExt, ResultExt, ensure};
use crate::DfRecordBatch;
use crate::error::{
self, ArrowComputeSnafu, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
ProjectArrowRecordBatchSnafu, Result,
self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
Result,
};
/// A two-dimensional batch of column-oriented data with a defined schema.
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
pub schema: SchemaRef,
pub columns: Vec<VectorRef>,
df_record_batch: DfRecordBatch,
}
@@ -65,7 +64,6 @@ impl RecordBatch {
Ok(RecordBatch {
schema,
columns,
df_record_batch,
})
}
@@ -91,14 +89,8 @@ impl RecordBatch {
/// Create an empty [`RecordBatch`] from `schema`.
pub fn new_empty(schema: SchemaRef) -> RecordBatch {
let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone());
let columns = schema
.column_schemas()
.iter()
.map(|col| col.data_type.create_mutable_vector(0).to_vector())
.collect();
RecordBatch {
schema,
columns,
df_record_batch,
}
}
@@ -113,17 +105,12 @@ impl RecordBatch {
.context(error::NewDfRecordBatchSnafu)?;
Ok(RecordBatch {
schema,
columns: vec![],
df_record_batch,
})
}
pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
let mut columns = Vec::with_capacity(indices.len());
for index in indices {
columns.push(self.columns[*index].clone());
}
let df_record_batch = self.df_record_batch.project(indices).with_context(|_| {
ProjectArrowRecordBatchSnafu {
schema: self.schema.clone(),
@@ -133,7 +120,6 @@ impl RecordBatch {
Ok(Self {
schema,
columns,
df_record_batch,
})
}
@@ -141,21 +127,11 @@ impl RecordBatch {
/// Create a new [`RecordBatch`] from `schema` and `df_record_batch`.
///
/// This method doesn't check the schema.
pub fn try_from_df_record_batch(
schema: SchemaRef,
df_record_batch: DfRecordBatch,
) -> Result<RecordBatch> {
let columns = df_record_batch
.columns()
.iter()
.map(|c| Helper::try_into_vector(c.clone()).context(error::DataTypesSnafu))
.collect::<Result<Vec<_>>>()?;
Ok(RecordBatch {
pub fn from_df_record_batch(schema: SchemaRef, df_record_batch: DfRecordBatch) -> RecordBatch {
RecordBatch {
schema,
columns,
df_record_batch,
})
}
}
#[inline]
@@ -169,23 +145,22 @@ impl RecordBatch {
}
#[inline]
pub fn columns(&self) -> &[VectorRef] {
&self.columns
pub fn columns(&self) -> &[ArrayRef] {
self.df_record_batch.columns()
}
#[inline]
pub fn column(&self, idx: usize) -> &VectorRef {
&self.columns[idx]
pub fn column(&self, idx: usize) -> &ArrayRef {
self.df_record_batch.column(idx)
}
pub fn column_by_name(&self, name: &str) -> Option<&VectorRef> {
let idx = self.schema.column_index_by_name(name)?;
Some(&self.columns[idx])
pub fn column_by_name(&self, name: &str) -> Option<&ArrayRef> {
self.df_record_batch.column_by_name(name)
}
#[inline]
pub fn num_columns(&self) -> usize {
self.columns.len()
self.df_record_batch.num_columns()
}
#[inline]
@@ -201,9 +176,14 @@ impl RecordBatch {
let mut vectors = HashMap::with_capacity(self.num_columns());
// column schemas in recordbatch must match its vectors, otherwise it's corrupted
for (vector_schema, vector) in self.schema.column_schemas().iter().zip(self.columns.iter())
for (field, array) in self
.df_record_batch
.schema()
.fields()
.iter()
.zip(self.df_record_batch.columns().iter())
{
let column_name = &vector_schema.name;
let column_name = field.name();
let column_schema =
table_schema
.column_schema_by_name(column_name)
@@ -211,15 +191,12 @@ impl RecordBatch {
table_name,
column_name,
})?;
let vector = if vector_schema.data_type != column_schema.data_type {
vector
.cast(&column_schema.data_type)
.with_context(|_| CastVectorSnafu {
from_type: vector.data_type(),
to_type: column_schema.data_type.clone(),
})?
let vector = if field.data_type() != &column_schema.data_type.as_arrow_type() {
let array = compute::cast(array, &column_schema.data_type.as_arrow_type())
.context(ArrowComputeSnafu)?;
Helper::try_into_vector(array).context(DataTypesSnafu)?
} else {
vector.clone()
Helper::try_into_vector(array).context(DataTypesSnafu)?
};
let _ = vectors.insert(column_name.clone(), vector);
@@ -244,8 +221,69 @@ impl RecordBatch {
visit_index: offset + len
}
);
let columns = self.columns.iter().map(|vector| vector.slice(offset, len));
RecordBatch::new(self.schema.clone(), columns)
let sliced = self.df_record_batch.slice(offset, len);
Ok(RecordBatch::from_df_record_batch(
self.schema.clone(),
sliced,
))
}
/// Returns the total number of bytes of memory pointed to by the arrays in this `RecordBatch`.
///
/// The buffers store bytes in the Arrow memory format, and include the data as well as the validity map.
/// Note that this does not always correspond to the exact memory usage of an array,
/// since multiple arrays can share the same buffers or slices thereof.
pub fn buffer_memory_size(&self) -> usize {
self.df_record_batch
.columns()
.iter()
.map(|array| array.get_buffer_memory_size())
.sum()
}
/// Iterate the values as strings in the column at index `i`.
///
/// Note that if the underlying array is not a valid GreptimeDB vector, an empty iterator is
/// returned.
///
/// # Panics
/// if index `i` is out of bound.
pub fn iter_column_as_string(&self, i: usize) -> Box<dyn Iterator<Item = Option<String>> + '_> {
macro_rules! iter {
($column: ident) => {
Box::new(
(0..$column.len())
.map(|i| $column.is_valid(i).then(|| $column.value(i).to_string())),
)
};
}
let column = self.df_record_batch.column(i);
match column.data_type() {
ArrowDataType::Utf8 => {
let column = column.as_string::<i32>();
let iter = iter!(column);
iter as _
}
ArrowDataType::LargeUtf8 => {
let column = column.as_string::<i64>();
iter!(column)
}
ArrowDataType::Utf8View => {
let column = column.as_string_view();
iter!(column)
}
_ => {
if let Ok(column) = Helper::try_into_vector(column) {
Box::new(
(0..column.len())
.map(move |i| (!column.is_null(i)).then(|| column.get(i).to_string())),
)
} else {
Box::new(std::iter::empty())
}
}
}
}
}
@@ -259,8 +297,9 @@ impl Serialize for RecordBatch {
let mut s = serializer.serialize_struct("record", 2)?;
s.serialize_field("schema", &**self.schema.arrow_schema())?;
let vec = self
.columns
let columns = self.df_record_batch.columns();
let columns = Helper::try_into_vectors(columns).map_err(Error::custom)?;
let vec = columns
.iter()
.map(|c| c.serialize_to_json())
.collect::<std::result::Result<Vec<_>, _>>()
@@ -278,27 +317,14 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
return Ok(RecordBatch::new_empty(schema));
}
let n_rows = batches.iter().map(|b| b.num_rows()).sum();
let n_columns = schema.num_columns();
// Collect arrays from each batch
let mut merged_columns = Vec::with_capacity(n_columns);
for col_idx in 0..n_columns {
let mut acc = schema.column_schemas()[col_idx]
.data_type
.create_mutable_vector(n_rows);
for batch in batches {
let column = batch.column(col_idx);
acc.extend_slice_of(column.as_ref(), 0, column.len())
.context(error::DataTypesSnafu)?;
}
merged_columns.push(acc.to_vector());
}
let record_batch = compute::concat_batches(
schema.arrow_schema(),
batches.iter().map(|x| x.df_record_batch()),
)
.context(ArrowComputeSnafu)?;
// Create a new RecordBatch with merged columns
RecordBatch::new(schema, merged_columns)
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
#[cfg(test)]
@@ -326,21 +352,21 @@ mod tests {
let c2 = Arc::new(UInt32Vector::from_slice([4, 5, 6]));
let columns: Vec<VectorRef> = vec![c1, c2];
let expected = vec![
Arc::new(UInt32Array::from_iter_values([1, 2, 3])) as ArrayRef,
Arc::new(UInt32Array::from_iter_values([4, 5, 6])),
];
let batch = RecordBatch::new(schema.clone(), columns.clone()).unwrap();
assert_eq!(3, batch.num_rows());
assert_eq!(&columns, batch.columns());
for (i, expect) in columns.iter().enumerate().take(batch.num_columns()) {
let column = batch.column(i);
assert_eq!(expect, column);
}
assert_eq!(expected, batch.df_record_batch().columns());
assert_eq!(schema, batch.schema);
assert_eq!(columns[0], *batch.column_by_name("c1").unwrap());
assert_eq!(columns[1], *batch.column_by_name("c2").unwrap());
assert_eq!(&expected[0], batch.column_by_name("c1").unwrap());
assert_eq!(&expected[1], batch.column_by_name("c2").unwrap());
assert!(batch.column_by_name("c3").is_none());
let converted =
RecordBatch::try_from_df_record_batch(schema, batch.df_record_batch().clone()).unwrap();
let converted = RecordBatch::from_df_record_batch(schema, batch.df_record_batch().clone());
assert_eq!(batch, converted);
assert_eq!(*batch.df_record_batch(), converted.into_df_record_batch());
}
@@ -385,12 +411,12 @@ mod tests {
let recordbatch = recordbatch.slice(1, 2).expect("recordbatch slice");
let expected = &UInt32Array::from_iter_values([2u32, 3]);
let array = recordbatch.column(0).to_arrow_array();
let array = recordbatch.column(0);
let actual = array.as_primitive::<UInt32Type>();
assert_eq!(expected, actual);
let expected = &StringArray::from(vec!["hello", "greptime"]);
let array = recordbatch.column(1).to_arrow_array();
let array = recordbatch.column(1);
let actual = array.as_string::<i32>();
assert_eq!(expected, actual);

View File

@@ -12,9 +12,117 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::{ArrayRef, AsArray};
use arrow::datatypes::{
DataType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
DurationSecondType, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use common_time::time::Time;
use common_time::{Duration, Timestamp};
pub type BinaryArray = arrow::array::BinaryArray;
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
pub type StringArray = arrow::array::StringArray;
pub type MutableStringArray = arrow::array::StringBuilder;
pub type LargeStringArray = arrow::array::LargeStringArray;
pub type MutableLargeStringArray = arrow::array::LargeStringBuilder;
/// Get the [Timestamp] value at index `i` of the timestamp array.
///
/// Note: This method does not check for nulls and the value is arbitrary
/// if [`is_null`](arrow::array::Array::is_null) returns true for the index.
///
/// # Panics
/// 1. if index `i` is out of bounds;
/// 2. or the array is not timestamp type.
pub fn timestamp_array_value(array: &ArrayRef, i: usize) -> Timestamp {
let DataType::Timestamp(time_unit, _) = &array.data_type() else {
unreachable!()
};
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
Timestamp::new(v, time_unit.into())
}
/// Get the [Time] value at index `i` of the time array.
///
/// Note: This method does not check for nulls and the value is arbitrary
/// if [`is_null`](arrow::array::Array::is_null) returns true for the index.
///
/// # Panics
/// 1. if index `i` is out of bounds;
/// 2. or the array is not `Time32` or `Time64` type.
pub fn time_array_value(array: &ArrayRef, i: usize) -> Time {
match array.data_type() {
DataType::Time32(time_unit) | DataType::Time64(time_unit) => match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
},
_ => unreachable!(),
}
}
/// Get the [Duration] value at index `i` of the duration array.
///
/// Note: This method does not check for nulls and the value is arbitrary
/// if [`is_null`](arrow::array::Array::is_null) returns true for the index.
///
/// # Panics
/// 1. if index `i` is out of bounds;
/// 2. or the array is not duration type.
pub fn duration_array_value(array: &ArrayRef, i: usize) -> Duration {
let DataType::Duration(time_unit) = array.data_type() else {
unreachable!();
};
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<DurationSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<DurationMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<DurationMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<DurationNanosecondType>();
array.value(i)
}
};
Duration::new(v, time_unit.into())
}

View File

@@ -22,13 +22,17 @@ use std::task::{Context, Poll};
use common_datasource::object_store::build_backend;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::error::{
CastVectorSnafu, DataTypesSnafu, ExternalSnafu, Result as RecordBatchResult,
};
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::logical_expr::utils as df_logical_expr_utils;
use datafusion_expr::expr::Expr;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::VectorRef;
use datatypes::vectors::{Helper, VectorRef};
use futures::Stream;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::ScanRequest;
@@ -197,7 +201,7 @@ impl FileToScanRegionStream {
.all(|scan_column_schema| {
file_record_batch
.column_by_name(&scan_column_schema.name)
.map(|rb| rb.data_type() == scan_column_schema.data_type)
.map(|rb| rb.data_type() == &scan_column_schema.data_type.as_arrow_type())
.unwrap_or_default()
})
}
@@ -231,9 +235,10 @@ impl FileToScanRegionStream {
}
fn cast_column_type(
source_column: &VectorRef,
source_column: &ArrayRef,
target_data_type: &ConcreteDataType,
) -> RecordBatchResult<VectorRef> {
let source_column = Helper::try_into_vector(source_column).context(DataTypesSnafu)?;
if &source_column.data_type() == target_data_type {
Ok(source_column.clone())
} else {

View File

@@ -199,7 +199,7 @@ impl SourceSender {
/// send record batch
pub async fn send_record_batch(&self, batch: RecordBatch) -> Result<usize, Error> {
let row_cnt = batch.num_rows();
let batch = Batch::from(batch);
let batch = Batch::try_from(batch)?;
self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst);

View File

@@ -25,6 +25,7 @@ mod signature;
pub(crate) mod utils;
use arrow::compute::FilterBuilder;
use common_recordbatch::RecordBatch;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
@@ -38,6 +39,8 @@ pub(crate) use relation::{Accum, Accumulator, AggregateExpr, AggregateFunc};
pub(crate) use scalar::{ScalarExpr, TypedExpr};
use snafu::{ResultExt, ensure};
use crate::Error;
use crate::error::DatatypesSnafu;
use crate::expr::error::{ArrowSnafu, DataTypeSnafu};
use crate::repr::Diff;
@@ -55,13 +58,19 @@ pub struct Batch {
diffs: Option<VectorRef>,
}
impl From<common_recordbatch::RecordBatch> for Batch {
fn from(value: common_recordbatch::RecordBatch) -> Self {
Self {
impl TryFrom<RecordBatch> for Batch {
type Error = Error;
fn try_from(value: RecordBatch) -> Result<Self, Self::Error> {
let columns = value.columns();
let batch = Helper::try_into_vectors(columns).context(DatatypesSnafu {
extra: "failed to convert Arrow array to vector when building Flow batch",
})?;
Ok(Self {
row_count: value.num_rows(),
batch: value.columns,
batch,
diffs: None,
}
})
}
}

View File

@@ -34,7 +34,6 @@ use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionContext;
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{Expr, SortExpr, col, lit, lit_timestamp_nano, wildcard};
use datatypes::value::ValueRef;
use query::QueryEngineRef;
use serde_json::Value as JsonValue;
use servers::error::{
@@ -595,13 +594,10 @@ async fn trace_ids_from_output(output: Output) -> ServerResult<Vec<String>> {
{
let mut trace_ids = vec![];
for recordbatch in recordbatches {
for col in recordbatch.columns().iter() {
for row_idx in 0..recordbatch.num_rows() {
if let ValueRef::String(value) = col.get_ref(row_idx) {
trace_ids.push(value.to_string());
}
}
}
recordbatch
.iter_column_as_string(0)
.flatten()
.for_each(|x| trace_ids.push(x));
}
return Ok(trace_ids);

View File

@@ -20,7 +20,6 @@ use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_catalog::format_full_table_name;
use common_recordbatch::util;
use common_telemetry::tracing;
use datatypes::prelude::Value;
use promql_parser::label::{MatchOp, Matcher, Matchers};
use query::promql;
use query::promql::planner::PromPlanner;
@@ -90,15 +89,10 @@ impl Instance {
for batch in batches {
// Only one column the results, ensured by `prometheus::metric_name_matchers_to_plan`.
let names = batch.column(0);
for i in 0..names.len() {
let Value::String(name) = names.get(i) else {
unreachable!();
};
results.push(name.into_string());
}
batch
.iter_column_as_string(0)
.flatten()
.for_each(|x| results.push(x))
}
Ok(results)
@@ -173,11 +167,10 @@ impl Instance {
let mut results = Vec::with_capacity(batches.iter().map(|b| b.num_rows()).sum());
for batch in batches {
// Only one column in results, ensured by `prometheus::label_values_matchers_to_plan`.
let names = batch.column(0);
for i in 0..names.len() {
results.push(names.get(i).to_string());
}
batch
.iter_column_as_string(0)
.flatten()
.for_each(|x| results.push(x))
}
Ok(results)

View File

@@ -317,45 +317,20 @@ pub fn decode_batch_stream<T: Send + 'static>(
/// Decode a record batch to a list of key and value.
fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
let key_col = batch.column(0);
let val_col = batch.column(1);
(0..batch.num_rows())
.flat_map(move |row_index| {
let key = key_col
.get_ref(row_index)
.try_into_string()
.unwrap()
.map(|s| s.to_string());
key.map(|k| {
(
k,
val_col
.get_ref(row_index)
.try_into_string()
.unwrap()
.map(|s| s.to_string())
.unwrap_or_default(),
)
let keys = batch.iter_column_as_string(0);
let values = batch.iter_column_as_string(1);
keys.zip(values)
.filter_map(|(k, v)| match (k, v) {
(Some(k), Some(v)) => Some((k, v)),
(Some(k), None) => Some((k, "".to_string())),
(None, _) => None,
})
})
.collect()
.collect::<Vec<_>>()
}
/// Decode a record batch to a list of key.
fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
let key_col = batch.column(0);
(0..batch.num_rows())
.flat_map(move |row_index| {
key_col
.get_ref(row_index)
.try_into_string()
.unwrap()
.map(|s| s.to_string())
})
.collect()
batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
}
// simulate to `KvBackend`
@@ -590,6 +565,8 @@ impl MetadataRegion {
/// Retrieves the value associated with the given key in the specified region.
/// Returns `Ok(None)` if the key is not found.
pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
use datatypes::arrow::array::{Array, AsArray};
let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
.eq(datafusion::prelude::lit(key));
@@ -611,12 +588,9 @@ impl MetadataRegion {
return Ok(None);
};
let val = first_batch
.column(0)
.get_ref(0)
.try_into_string()
.unwrap()
.map(|s| s.to_string());
let column = first_batch.column(0);
let column = column.as_string::<i32>();
let val = column.is_valid(0).then(|| column.value(0).to_string());
Ok(val)
}

View File

@@ -19,8 +19,8 @@ use std::time::Duration;
use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::TimestampMillisecondType;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::AlterKind::SetRegionOptions;
use store_api::region_request::{
@@ -125,10 +125,8 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
let ts_col = batch
.column_by_name("ts")
.unwrap()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
res.extend(ts_col.iter_data().map(|t| t.unwrap().0.value()));
.as_primitive::<TimestampMillisecondType>();
res.extend((0..ts_col.len()).map(|i| ts_col.value(i)));
}
res
}

View File

@@ -109,7 +109,10 @@ impl ConvertBatchStream {
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
Ok(RecordBatch::from_df_record_batch(
output_schema,
record_batch,
))
}
ScanBatch::RecordBatch(df_record_batch) => {
// Safety: Only flat format returns this batch.

View File

@@ -19,6 +19,8 @@ use api::v1::{
ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest,
RowInsertRequests, Rows, SemanticType,
};
use arrow::array::{Array, AsArray};
use arrow::datatypes::TimestampNanosecondType;
use common_query::OutputData;
use common_recordbatch::util as record_util;
use common_telemetry::{debug, info};
@@ -27,9 +29,7 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::col;
use datafusion_common::TableReference;
use datafusion_expr::{DmlStatement, LogicalPlan};
use datatypes::prelude::ScalarVector;
use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use itertools::Itertools;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
@@ -527,8 +527,7 @@ impl PipelineTable {
for r in records {
let pipeline_content_column = r.column(0);
let pipeline_content = pipeline_content_column
.as_any()
.downcast_ref::<StringVector>()
.as_string_opt::<i32>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into string vector",
@@ -537,20 +536,19 @@ impl PipelineTable {
})?;
let pipeline_schema_column = r.column(1);
let pipeline_schema = pipeline_schema_column
.as_any()
.downcast_ref::<StringVector>()
let pipeline_schema =
pipeline_schema_column
.as_string_opt::<i32>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into string vector",
"expecting pipeline schema column of type string, actual: {}",
pipeline_schema_column.data_type()
),
})?;
let pipeline_created_at_column = r.column(2);
let pipeline_created_at = pipeline_created_at_column
.as_any()
.downcast_ref::<TimestampNanosecondVector>()
.as_primitive_opt::<TimestampNanosecondType>()
.with_context(|| CastTypeSnafu {
msg: format!(
"can't downcast {:?} array into scalar vector",
@@ -572,9 +570,9 @@ impl PipelineTable {
let len = pipeline_content.len();
for i in 0..len {
re.push((
pipeline_content.get_data(i).unwrap().to_string(),
pipeline_schema.get_data(i).unwrap().to_string(),
pipeline_created_at.get_data(i).unwrap(),
pipeline_content.value(i).to_string(),
pipeline_schema.value(i).to_string(),
TimestampNanosecond::new(pipeline_created_at.value(i)),
));
}
}

View File

@@ -18,7 +18,6 @@ use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use common_recordbatch::RecordBatch as GtRecordBatch;
use common_telemetry::warn;
use datafusion::arrow::array::AsArray;
use datafusion::arrow::compute::{self, SortOptions, concat_batches};
@@ -41,9 +40,8 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::{Column, Expr};
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
use datatypes::schema::Schema as GtSchema;
use datatypes::value::{OrderedF64, ValueRef};
use datatypes::vectors::MutableVector;
use datatypes::vectors::{Helper, MutableVector};
use futures::{Stream, StreamExt, ready};
/// `HistogramFold` will fold the conventional (non-native) histogram ([1]) for later
@@ -560,36 +558,29 @@ impl HistogramFoldStream {
let mut remaining_rows = self.input_buffered_rows;
let mut cursor = 0;
let gt_schema = GtSchema::try_from(self.input.schema()).unwrap();
let batch = GtRecordBatch::try_from_df_record_batch(Arc::new(gt_schema), batch).unwrap();
// TODO(LFC): Try to get rid of the Arrow array to vector conversion here.
let vectors = Helper::try_into_vectors(batch.columns())
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
while remaining_rows >= bucket_num {
// "sample" normal columns
for normal_index in &self.normal_indices {
let val = batch.column(*normal_index).get(cursor);
let val = vectors[*normal_index].get(cursor);
self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref());
}
// "fold" `le` and field columns
let le_array = batch.column(self.le_column_index);
let le_array = le_array.as_string::<i32>();
let field_array = batch.column(self.field_column_index);
let field_array = field_array.as_primitive::<Float64Type>();
let mut bucket = vec![];
let mut counters = vec![];
for bias in 0..bucket_num {
let le_str_val = le_array.get(cursor + bias);
let le_str_val_ref = le_str_val.as_value_ref();
let le_str = le_str_val_ref
.try_into_string()
.unwrap()
.expect("le column should not be nullable");
let le_str = le_array.value(cursor + bias);
let le = le_str.parse::<f64>().unwrap();
bucket.push(le);
let counter = field_array
.get(cursor + bias)
.as_value_ref()
.try_into_f64()
.unwrap()
.expect("field column should not be nullable");
let counter = field_array.value(cursor + bias);
counters.push(counter);
}
// ignore invalid data
@@ -600,7 +591,7 @@ impl HistogramFoldStream {
self.output_buffered_rows += 1;
}
let remaining_input_batch = batch.into_df_record_batch().slice(cursor, remaining_rows);
let remaining_input_batch = batch.slice(cursor, remaining_rows);
self.input_buffered_rows = remaining_input_batch.num_rows();
self.input_buffer.push(remaining_input_batch);

View File

@@ -682,13 +682,14 @@ impl QueryExecutor for DatafusionQueryEngine {
mod tests {
use std::sync::Arc;
use arrow::array::{ArrayRef, UInt64Array};
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_recordbatch::util;
use datafusion::prelude::{col, lit};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
use datatypes::vectors::{Helper, UInt32Vector, VectorRef};
use session::context::{QueryContext, QueryContextBuilder};
use table::table::numbers::{NUMBERS_TABLE_NAME, NumbersTable};
@@ -770,10 +771,8 @@ mod tests {
assert_eq!(1, batch.num_columns());
assert_eq!(batch.column(0).len(), 1);
assert_eq!(
*batch.column(0),
Arc::new(UInt64Vector::from_slice([4950])) as VectorRef
);
let expected = Arc::new(UInt64Array::from_iter_values([4950])) as ArrayRef;
assert_eq!(batch.column(0), &expected);
}
_ => unreachable!(),
}

View File

@@ -1440,8 +1440,7 @@ mod test {
..
}) => {
let record = record.take().first().cloned().unwrap();
let data = record.column(0);
Ok(data.get(0).to_string())
Ok(record.iter_column_as_string(0).next().unwrap().unwrap())
}
Ok(_) => unreachable!(),
Err(e) => Err(e),

View File

@@ -18,7 +18,7 @@ use common_function::scalars::vector::impl_conv::veclit_to_binlit;
use common_recordbatch::RecordBatch;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::BinaryVector;
use datatypes::vectors::{BinaryVector, Helper};
use rand::Rng;
use table::test_util::MemTable;
@@ -64,5 +64,6 @@ pub fn get_value_from_batches(column_name: &str, batches: Vec<RecordBatch>) -> V
assert_eq!(batch.column(0).len(), 1);
let v = batch.column(0);
assert_eq!(1, v.len());
let v = Helper::try_into_vector(v).unwrap();
v.get(0)
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use arrow::array::{ArrayRef, UInt32Array};
use catalog::RegisterTableRequest;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
@@ -97,11 +98,10 @@ async fn test_datafusion_query_engine() -> Result<()> {
let batch = &numbers[0];
assert_eq!(1, batch.num_columns());
assert_eq!(batch.column(0).len(), limit);
let expected: Vec<u32> = (0u32..limit as u32).collect();
assert_eq!(
*batch.column(0),
Arc::new(UInt32Vector::from_slice(expected)) as VectorRef
);
let expected = Arc::new(UInt32Array::from_iter_values(
(0u32..limit as u32).collect::<Vec<_>>(),
)) as ArrayRef;
assert_eq!(batch.column(0), &expected);
Ok(())
}

View File

@@ -34,7 +34,7 @@ async fn test_vec_avg_aggregator() -> Result<(), common_query::error::Error> {
let sql = "SELECT vector FROM vectors";
let vectors = exec_selection(engine, sql).await;
let column = vectors[0].column(0).to_arrow_array();
let column = vectors[0].column(0);
let len = column.len();
for i in 0..column.len() {
let v = ScalarValue::try_from_array(&column, i)?;

View File

@@ -32,7 +32,7 @@ async fn test_vec_product_aggregator() -> Result<(), common_query::error::Error>
let sql = "SELECT vector FROM vectors";
let vectors = exec_selection(engine, sql).await;
let column = vectors[0].column(0).to_arrow_array();
let column = vectors[0].column(0);
for i in 0..column.len() {
let v = ScalarValue::try_from_array(&column, i)?;
let vector = as_veclit(&v)?;

View File

@@ -34,7 +34,7 @@ async fn test_vec_sum_aggregator() -> Result<(), common_query::error::Error> {
let sql = "SELECT vector FROM vectors";
let vectors = exec_selection(engine, sql).await;
let column = vectors[0].column(0).to_arrow_array();
let column = vectors[0].column(0);
for i in 0..column.len() {
let v = ScalarValue::try_from_array(&column, i)?;
let vector = as_veclit(&v)?;

View File

@@ -32,9 +32,7 @@ use common_telemetry::{debug, error, info};
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use datatypes::types::jsonb_to_serde_json;
use event::{LogState, LogValidatorRef};
use futures::FutureExt;
use http::{HeaderValue, Method};
@@ -55,8 +53,7 @@ use self::result::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::elasticsearch;
use crate::error::{
AddressBindSnafu, AlreadyStartedSnafu, ConvertSqlValueSnafu, Error, InternalIoSnafu,
InvalidHeaderValueSnafu, Result, ToJsonSnafu,
AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::otlp::OtlpState;
@@ -109,6 +106,7 @@ pub mod result;
mod timeout;
pub mod utils;
use result::HttpOutputWriter;
pub(crate) use timeout::DynamicTimeoutLayer;
mod hints;
@@ -298,30 +296,10 @@ impl HttpRecordsOutput {
} else {
let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::<usize>();
let mut rows = Vec::with_capacity(num_rows);
let schemas = schema.column_schemas();
let num_cols = schema.column_schemas().len();
rows.resize_with(num_rows, || Vec::with_capacity(num_cols));
let mut finished_row_cursor = 0;
for recordbatch in recordbatches {
for (col_idx, col) in recordbatch.columns().iter().enumerate() {
// safety here: schemas length is equal to the number of columns in the recordbatch
let schema = &schemas[col_idx];
for row_idx in 0..recordbatch.num_rows() {
let value = col.get(row_idx);
// TODO(sunng87): is this duplicated with `map_json_type_to_string` in recordbatch?
let value = if let ConcreteDataType::Json(_json_type) = &schema.data_type
&& let datatypes::value::Value::Binary(bytes) = value
{
jsonb_to_serde_json(bytes.as_ref()).context(ConvertSqlValueSnafu)?
} else {
serde_json::Value::try_from(col.get(row_idx)).context(ToJsonSnafu)?
};
rows[row_idx + finished_row_cursor].push(value);
}
}
finished_row_cursor += recordbatch.num_rows();
let mut writer = HttpOutputWriter::new(schema.num_columns(), None);
writer.write(recordbatch, &mut rows)?;
}
Ok(HttpRecordsOutput {

View File

@@ -19,16 +19,13 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::array::AsArray;
use arrow::array::{Array, AsArray};
use arrow::datatypes::{
Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use arrow_schema::{DataType, IntervalUnit};
use axum::extract::{Path, Query, State};
use axum::{Extension, Form};
use catalog::CatalogManagerRef;
@@ -39,18 +36,13 @@ use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::{debug, tracing};
use common_time::time::Time;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use common_time::{
Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use common_version::OwnedBuildInfo;
use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::types::jsonb_to_string;
use datatypes::vectors::Float64Vector;
use futures::StreamExt;
use futures::future::join_all;
use itertools::Itertools;
@@ -950,47 +942,12 @@ impl RowWriter {
let v = Date::new((array.value(i) / 86_400_000) as i32);
self.insert(column, v);
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let v = Timestamp::new(v, time_unit.into());
DataType::Timestamp(_, _) => {
let v = datatypes::arrow_array::timestamp_array_value(array, i);
self.insert(column, v.to_iso8601_string());
}
DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
};
DataType::Time32(_) | DataType::Time64(_) => {
let v = datatypes::arrow_array::time_array_value(array, i);
self.insert(column, v.to_iso8601_string());
}
DataType::Interval(interval_unit) => match interval_unit {
@@ -1010,26 +967,8 @@ impl RowWriter {
self.insert(column, v.to_iso8601_string());
}
},
DataType::Duration(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<DurationSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<DurationMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<DurationMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<DurationNanosecondType>();
array.value(i)
}
};
let d = Duration::new(v, time_unit.into());
DataType::Duration(_) => {
let d = datatypes::arrow_array::duration_array_value(array, i);
self.insert(column, d);
}
DataType::List(_) => {
@@ -1134,20 +1073,14 @@ fn record_batches_to_labels_name(
let field_columns = field_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap()
let column = batch.column(*i);
column.as_primitive::<Float64Type>()
})
.collect::<Vec<_>>();
for row_index in 0..batch.num_rows() {
// if all field columns are null, skip this row
if field_columns
.iter()
.all(|c| c.get_data(row_index).is_none())
{
if field_columns.iter().all(|c| c.is_null(row_index)) {
continue;
}

View File

@@ -12,6 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::AsArray;
use arrow::datatypes::{
Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit};
use common_decimal::Decimal128;
use common_recordbatch::RecordBatch;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use datafusion_common::ScalarValue;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use snafu::ResultExt;
use crate::error::{
ConvertScalarValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result, ToJsonSnafu,
UnexpectedResultSnafu,
};
pub(crate) mod arrow_result;
pub(crate) mod csv_result;
pub mod error_result;
@@ -22,3 +42,240 @@ pub(crate) mod json_result;
pub(crate) mod null_result;
pub(crate) mod prometheus_resp;
pub(crate) mod table_result;
pub(super) struct HttpOutputWriter {
columns: usize,
value_transformer: Option<Box<dyn Fn(Value) -> Value>>,
current: Option<Vec<serde_json::Value>>,
}
impl HttpOutputWriter {
pub(super) fn new(
columns: usize,
value_transformer: Option<Box<dyn Fn(Value) -> Value>>,
) -> Self {
Self {
columns,
value_transformer,
current: None,
}
}
fn write_bytes(&mut self, bytes: &[u8], datatype: &ConcreteDataType) -> Result<()> {
if datatype.is_json() {
let value = datatypes::types::jsonb_to_serde_json(bytes).map_err(|e| {
UnexpectedResultSnafu {
reason: format!("corrupted jsonb data: {bytes:?}, error: {e}"),
}
.build()
})?;
self.push(value);
Ok(())
} else {
self.write_value(bytes)
}
}
fn write_value(&mut self, value: impl Into<Value>) -> Result<()> {
let value = value.into();
let value = if let Some(f) = &self.value_transformer {
f(value)
} else {
value
};
let value = serde_json::Value::try_from(value).context(ToJsonSnafu)?;
self.push(value);
Ok(())
}
fn push(&mut self, value: serde_json::Value) {
let current = self
.current
.get_or_insert_with(|| Vec::with_capacity(self.columns));
current.push(value);
}
fn finish(&mut self) -> Vec<serde_json::Value> {
self.current.take().unwrap_or_default()
}
pub(super) fn write(
&mut self,
record_batch: RecordBatch,
rows: &mut Vec<Vec<serde_json::Value>>,
) -> Result<()> {
let schema = record_batch.schema.clone();
let record_batch = record_batch.into_df_record_batch();
for i in 0..record_batch.num_rows() {
for (schema, array) in schema
.column_schemas()
.iter()
.zip(record_batch.columns().iter())
{
if array.is_null(i) {
self.write_value(Value::Null)?;
continue;
}
match array.data_type() {
DataType::Null => {
self.write_value(Value::Null)?;
}
DataType::Boolean => {
let array = array.as_boolean();
let v = array.value(i);
self.write_value(v)?;
}
DataType::UInt8 => {
let array = array.as_primitive::<UInt8Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::UInt16 => {
let array = array.as_primitive::<UInt16Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::UInt32 => {
let array = array.as_primitive::<UInt32Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::UInt64 => {
let array = array.as_primitive::<UInt64Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Float32 => {
let array = array.as_primitive::<Float32Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Float64 => {
let array = array.as_primitive::<Float64Type>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Utf8 => {
let array = array.as_string::<i32>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Utf8View => {
let array = array.as_string_view();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Binary => {
let array = array.as_binary::<i32>();
let v = array.value(i);
self.write_bytes(v, &schema.data_type)?;
}
DataType::LargeBinary => {
let array = array.as_binary::<i64>();
let v = array.value(i);
self.write_bytes(v, &schema.data_type)?;
}
DataType::BinaryView => {
let array = array.as_binary_view();
let v = array.value(i);
self.write_bytes(v, &schema.data_type)?;
}
DataType::Date32 => {
let array = array.as_primitive::<Date32Type>();
let v = Date::new(array.value(i));
self.write_value(v)?;
}
DataType::Date64 => {
let array = array.as_primitive::<Date64Type>();
// `Date64` values are milliseconds representation of `Date32` values,
// according to its specification. So we convert the `Date64` value here to
// the `Date32` value to process them unified.
let v = Date::new((array.value(i) / 86_400_000) as i32);
self.write_value(v)?;
}
DataType::Timestamp(_, _) => {
let ts = datatypes::arrow_array::timestamp_array_value(array, i);
self.write_value(ts)?;
}
DataType::Time32(_) | DataType::Time64(_) => {
let v = datatypes::arrow_array::time_array_value(array, i);
self.write_value(v)?;
}
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = array.as_primitive::<IntervalYearMonthType>();
let v: IntervalYearMonth = array.value(i).into();
self.write_value(v)?;
}
IntervalUnit::DayTime => {
let array = array.as_primitive::<IntervalDayTimeType>();
let v: IntervalDayTime = array.value(i).into();
self.write_value(v)?;
}
IntervalUnit::MonthDayNano => {
let array = array.as_primitive::<IntervalMonthDayNanoType>();
let v: IntervalMonthDayNano = array.value(i).into();
self.write_value(v)?;
}
},
DataType::Duration(_) => {
let d = datatypes::arrow_array::duration_array_value(array, i);
self.write_value(d)?;
}
DataType::List(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
self.write_value(v)?;
}
DataType::Struct(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
let v: Value = v.try_into().context(ConvertScalarValueSnafu)?;
self.write_value(v)?;
}
DataType::Decimal128(precision, scale) => {
let array = array.as_primitive::<Decimal128Type>();
let v = Decimal128::new(array.value(i), *precision, *scale);
self.write_value(v)?;
}
_ => {
return NotSupportedSnafu {
feat: format!("convert {} to http output value", array.data_type()),
}
.fail();
}
}
}
rows.push(self.finish())
}
Ok(())
}
}

View File

@@ -12,35 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::AsArray;
use arrow::datatypes::{
Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use axum::Json;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use common_decimal::Decimal128;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, util};
use common_time::time::Time;
use common_time::{
Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use datafusion_common::ScalarValue;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::ResultExt;
use crate::error::{
ConvertScalarValueSnafu, DataFusionSnafu, Error, NotSupportedSnafu, Result, ToJsonSnafu,
};
use crate::error::{Error, Result};
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::result::HttpOutputWriter;
use crate::http::result::error_result::ErrorResponse;
use crate::http::{Epoch, HttpResponse, ResponseFormat};
@@ -84,8 +66,8 @@ impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
} else {
// Safety: ensured by previous empty check
let first = &recordbatches[0];
let columns = first
.schema
let schema = first.schema.clone();
let columns = schema
.column_schemas()
.iter()
.map(|cs| cs.name.clone())
@@ -94,8 +76,23 @@ impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
let mut rows =
Vec::with_capacity(recordbatches.iter().map(|r| r.num_rows()).sum::<usize>());
let value_transformer =
move |value: datatypes::value::Value| -> datatypes::value::Value {
match (value, epoch) {
(datatypes::value::Value::Timestamp(ts), Some(epoch)) => {
if let Some(converted) = epoch.convert_timestamp(ts) {
datatypes::value::Value::Timestamp(converted)
} else {
datatypes::value::Value::Timestamp(ts)
}
}
(value, _) => value,
}
};
for recordbatch in recordbatches {
let mut writer = RowWriter::new(epoch, recordbatch.num_columns());
let mut writer =
HttpOutputWriter::new(schema.num_columns(), Some(Box::new(value_transformer)));
writer.write(recordbatch, &mut rows)?;
}
@@ -104,266 +101,6 @@ impl TryFrom<(Option<Epoch>, Vec<RecordBatch>)> for InfluxdbRecordsOutput {
}
}
struct RowWriter {
epoch: Option<Epoch>,
columns: usize,
current: Option<Vec<Value>>,
}
impl RowWriter {
fn new(epoch: Option<Epoch>, columns: usize) -> Self {
Self {
epoch,
columns,
current: None,
}
}
fn push(&mut self, value: impl Into<datatypes::value::Value>) -> Result<()> {
let value = value.into();
let current = self
.current
.get_or_insert_with(|| Vec::with_capacity(self.columns));
let value = Value::try_from(value).context(ToJsonSnafu)?;
current.push(value);
Ok(())
}
fn finish(&mut self) -> Vec<Value> {
self.current.take().unwrap_or_default()
}
fn write(&mut self, record_batch: RecordBatch, rows: &mut Vec<Vec<Value>>) -> Result<()> {
let record_batch = record_batch.into_df_record_batch();
for i in 0..record_batch.num_rows() {
for array in record_batch.columns().iter() {
if array.is_null(i) {
self.push(datatypes::value::Value::Null)?;
continue;
}
match array.data_type() {
DataType::Null => {
self.push(datatypes::value::Value::Null)?;
}
DataType::Boolean => {
let array = array.as_boolean();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt8 => {
let array = array.as_primitive::<UInt8Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt16 => {
let array = array.as_primitive::<UInt16Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt32 => {
let array = array.as_primitive::<UInt32Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::UInt64 => {
let array = array.as_primitive::<UInt64Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Float32 => {
let array = array.as_primitive::<Float32Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Float64 => {
let array = array.as_primitive::<Float64Type>();
let v = array.value(i);
self.push(v)?;
}
DataType::Utf8 => {
let array = array.as_string::<i32>();
let v = array.value(i);
self.push(v)?;
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
let v = array.value(i);
self.push(v)?;
}
DataType::Utf8View => {
let array = array.as_string_view();
let v = array.value(i);
self.push(v)?;
}
DataType::Binary => {
let array = array.as_binary::<i32>();
let v = array.value(i);
self.push(v)?;
}
DataType::LargeBinary => {
let array = array.as_binary::<i64>();
let v = array.value(i);
self.push(v)?;
}
DataType::BinaryView => {
let array = array.as_binary_view();
let v = array.value(i);
self.push(v)?;
}
DataType::Date32 => {
let array = array.as_primitive::<Date32Type>();
let v = Date::new(array.value(i));
self.push(v)?;
}
DataType::Date64 => {
let array = array.as_primitive::<Date64Type>();
// `Date64` values are milliseconds representation of `Date32` values,
// according to its specification. So we convert the `Date64` value here to
// the `Date32` value to process them unified.
let v = Date::new((array.value(i) / 86_400_000) as i32);
self.push(v)?;
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let mut ts = Timestamp::new(v, time_unit.into());
if let Some(epoch) = self.epoch
&& let Some(converted) = epoch.convert_timestamp(ts)
{
ts = converted;
}
self.push(ts)?;
}
DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
};
self.push(v)?;
}
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = array.as_primitive::<IntervalYearMonthType>();
let v: IntervalYearMonth = array.value(i).into();
self.push(v)?;
}
IntervalUnit::DayTime => {
let array = array.as_primitive::<IntervalDayTimeType>();
let v: IntervalDayTime = array.value(i).into();
self.push(v)?;
}
IntervalUnit::MonthDayNano => {
let array = array.as_primitive::<IntervalMonthDayNanoType>();
let v: IntervalMonthDayNano = array.value(i).into();
self.push(v)?;
}
},
DataType::Duration(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<DurationSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<DurationMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<DurationMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<DurationNanosecondType>();
array.value(i)
}
};
let d = Duration::new(v, time_unit.into());
self.push(d)?;
}
DataType::List(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
let v: datatypes::value::Value =
v.try_into().context(ConvertScalarValueSnafu)?;
self.push(v)?;
}
DataType::Struct(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
let v: datatypes::value::Value =
v.try_into().context(ConvertScalarValueSnafu)?;
self.push(v)?;
}
DataType::Decimal128(precision, scale) => {
let array = array.as_primitive::<Decimal128Type>();
let v = Decimal128::new(array.value(i), *precision, *scale);
self.push(v)?;
}
_ => {
return NotSupportedSnafu {
feat: format!("convert {} to influxdb value", array.data_type()),
}
.fail();
}
}
}
rows.push(self.finish())
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub struct InfluxdbOutput {
pub statement_id: u32,

View File

@@ -16,6 +16,9 @@
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use arrow::array::{Array, AsArray};
use arrow::datatypes::{Float64Type, TimestampMillisecondType};
use arrow_schema::DataType;
use axum::Json;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
@@ -24,8 +27,6 @@ use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use indexmap::IndexMap;
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::value::ValueType;
@@ -34,7 +35,7 @@ use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use crate::error::{
CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, status_code_to_http_status,
ArrowSnafu, CollectRecordbatchSnafu, Result, UnexpectedResultSnafu, status_code_to_http_status,
};
use crate::http::header::{GREPTIME_DB_HEADER_METRICS, collect_plan_metrics};
use crate::http::prometheus::{
@@ -247,13 +248,7 @@ impl PrometheusJsonResponse {
// prepare things...
let tag_columns = tag_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<StringVector>()
.unwrap()
})
.map(|i| batch.column(*i).as_string::<i32>())
.collect::<Vec<_>>();
let tag_names = tag_column_indices
.iter()
@@ -261,22 +256,18 @@ impl PrometheusJsonResponse {
.collect::<Vec<_>>();
let timestamp_column = batch
.column(timestamp_column_index)
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
let casted_field_column = batch
.column(first_field_column_index)
.cast(&ConcreteDataType::float64_datatype())
.unwrap();
let field_column = casted_field_column
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap();
.as_primitive::<TimestampMillisecondType>();
let array =
arrow::compute::cast(batch.column(first_field_column_index), &DataType::Float64)
.context(ArrowSnafu)?;
let field_column = array.as_primitive::<Float64Type>();
// assemble rows
for row_index in 0..batch.num_rows() {
// retrieve value
if let Some(v) = field_column.get_data(row_index) {
if field_column.is_valid(row_index) {
let v = field_column.value(row_index);
// ignore all NaN values to reduce the amount of data to be sent.
if v.is_nan() {
continue;
@@ -289,14 +280,13 @@ impl PrometheusJsonResponse {
}
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name, tag_value));
if tag_column.is_valid(row_index) {
tags.push((tag_name, tag_column.value(row_index)));
}
}
// retrieve timestamp
let timestamp_millis: i64 =
timestamp_column.get_data(row_index).unwrap().into();
let timestamp_millis = timestamp_column.value(row_index);
let timestamp = timestamp_millis as f64 / 1000.0;
buffer

View File

@@ -16,6 +16,7 @@
#![feature(try_blocks)]
#![feature(exclusive_wrapper)]
#![feature(if_let_guard)]
#![feature(box_patterns)]
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;

View File

@@ -16,22 +16,18 @@ use std::time::Duration;
use arrow::array::{Array, AsArray};
use arrow::datatypes::{
Date32Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use arrow_schema::{DataType, IntervalUnit};
use common_decimal::Decimal128;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use common_time::time::Time;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
@@ -312,26 +308,8 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
let v = Date::new(array.value(i));
row_writer.write_col(v.to_chrono_date())?;
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let v = Timestamp::new(v, time_unit.into());
DataType::Timestamp(_, _) => {
let v = datatypes::arrow_array::timestamp_array_value(column, i);
let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
row_writer.write_col(v)?;
}
@@ -352,28 +330,11 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
row_writer.write_col(v.to_iso8601_string())?;
}
},
DataType::Duration(time_unit) => match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<DurationSecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_secs(v as u64))?;
DataType::Duration(_) => {
let v: Duration =
datatypes::arrow_array::duration_array_value(column, i).into();
row_writer.write_col(v)?;
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<DurationMillisecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_millis(v as u64))?;
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<DurationMicrosecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_micros(v as u64))?;
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<DurationNanosecondType>();
let v = array.value(i);
row_writer.write_col(Duration::from_nanos(v as u64))?;
}
},
DataType::List(_) => {
let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
row_writer.write_col(v.to_string())?;
@@ -382,37 +343,8 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
row_writer.write_col(v.to_string())?;
}
DataType::Time32(time_unit) => {
let time = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
_ => unreachable!(
"`DataType::Time32` has only second and millisecond time units"
),
};
let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
row_writer.write_col(v)?;
}
DataType::Time64(time_unit) => {
let time = match time_unit {
TimeUnit::Microsecond => {
let array = column.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
_ => unreachable!(
"`DataType::Time64` has only microsecond and nanosecond time units"
),
};
DataType::Time32(_) | DataType::Time64(_) => {
let time = datatypes::arrow_array::time_array_value(column, i);
let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
row_writer.write_col(v)?;
}

View File

@@ -22,8 +22,7 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, AsArray};
use arrow::datatypes::{
Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Date32Type, Date64Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
@@ -34,9 +33,7 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
use common_decimal::Decimal128;
use common_recordbatch::RecordBatch;
use common_time::time::Time;
use common_time::{
Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::LogicalPlan;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
@@ -567,26 +564,8 @@ impl RecordBatchRowIterator {
});
encoder.encode_field(&date)?;
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let v = Timestamp::new(v, time_unit.into());
DataType::Timestamp(_, _) => {
let v = datatypes::arrow_array::timestamp_array_value(column, i);
let datetime = v
.to_chrono_datetime_with_timezone(Some(&self.query_ctx.timezone()))
.map(|v| {
@@ -613,26 +592,8 @@ impl RecordBatchRowIterator {
encoder.encode_field(&PgInterval::from(v))?;
}
},
DataType::Duration(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<DurationSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<DurationMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<DurationMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<DurationNanosecondType>();
array.value(i)
}
};
let d = Duration::new(v, time_unit.into());
DataType::Duration(_) => {
let d = datatypes::arrow_array::duration_array_value(column, i);
match PgInterval::try_from(d) {
Ok(i) => encoder.encode_field(&i)?,
Err(e) => {
@@ -650,25 +611,8 @@ impl RecordBatchRowIterator {
DataType::Struct(_) => {
encode_struct(&self.query_ctx, Default::default(), encoder)?;
}
DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = column.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = column.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
TimeUnit::Microsecond => {
let array = column.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = column.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
};
DataType::Time32(_) | DataType::Time64(_) => {
let v = datatypes::arrow_array::time_array_value(column, i);
encoder.encode_field(&v.to_chrono_time())?;
}
DataType::Decimal128(precision, scale) => {

View File

@@ -21,18 +21,18 @@ use std::hash::{Hash, Hasher};
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{Label, Query, ReadRequest, Sample, TimeSeries, WriteRequest};
use api::v1::RowInsertRequests;
use arrow::array::{Array, AsArray};
use arrow::datatypes::{Float64Type, TimestampMillisecondType};
use common_grpc::precision::Precision;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::tracing;
use common_time::timestamp::TimeUnit;
use datafusion::prelude::{Expr, col, lit, regexp_match};
use datafusion_common::ScalarValue;
use datafusion_expr::LogicalPlan;
use datatypes::prelude::{ConcreteDataType, Value};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use query::dataframe::DataFrame;
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use crate::error::{self, Result};
@@ -233,6 +233,24 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<Ti
let row_count = recordbatch.num_rows();
let mut timeseries_ids = Vec::with_capacity(row_count);
let column_names = recordbatch
.schema
.column_schemas()
.iter()
.map(|column_schema| &column_schema.name);
let columns = column_names
.enumerate()
.filter(|(_, column_name)| {
*column_name != greptime_timestamp() && *column_name != greptime_value()
})
.map(|(i, column_name)| {
(
column_name,
recordbatch.iter_column_as_string(i).collect::<Vec<_>>(),
)
})
.collect::<Vec<_>>();
for row in 0..row_count {
let mut labels = Vec::with_capacity(recordbatch.num_columns() - 1);
labels.push(new_label(
@@ -240,20 +258,10 @@ fn collect_timeseries_ids(table_name: &str, recordbatch: &RecordBatch) -> Vec<Ti
table_name.to_string(),
));
for (i, column_schema) in recordbatch.schema.column_schemas().iter().enumerate() {
if column_schema.name == greptime_value() || column_schema.name == greptime_timestamp()
{
continue;
for (column_name, column_values) in columns.iter() {
if let Some(value) = &column_values[row] {
labels.push(new_label((*column_name).clone(), value.clone()));
}
let column = &recordbatch.columns()[i];
// A label with an empty label value is considered equivalent to a label that does not exist.
if column.is_null(row) {
continue;
}
let value = column.get(row).to_string();
labels.push(new_label(column_schema.name.clone(), value));
}
timeseries_ids.push(TimeSeriesId { labels });
}
@@ -280,30 +288,28 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
msg: "missing greptime_timestamp column in query result",
},
)?;
ensure!(
ts_column.data_type() == ConcreteDataType::timestamp_millisecond_datatype(),
error::InvalidPromRemoteReadQueryResultSnafu {
let ts_column = ts_column
.as_primitive_opt::<TimestampMillisecondType>()
.with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
msg: format!(
"Expect timestamp column of datatype Timestamp(Millisecond), actual {:?}",
ts_column.data_type()
)
}
);
),
})?;
let field_column = recordbatch.column_by_name(greptime_value()).context(
error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_value column in query result",
},
)?;
ensure!(
field_column.data_type() == ConcreteDataType::float64_datatype(),
error::InvalidPromRemoteReadQueryResultSnafu {
let field_column = field_column
.as_primitive_opt::<Float64Type>()
.with_context(|| error::InvalidPromRemoteReadQueryResultSnafu {
msg: format!(
"Expect value column of datatype Float64, actual {:?}",
field_column.data_type()
)
}
);
),
})?;
// First, collect each row's timeseries id
let timeseries_ids = collect_timeseries_ids(table, &recordbatch);
@@ -322,14 +328,8 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result<Ve
continue;
}
let value: f64 = match field_column.get(row) {
Value::Float64(value) => value.into(),
_ => unreachable!("checked by the \"ensure\" above"),
};
let timestamp = match ts_column.get(row) {
Value::Timestamp(t) if t.unit() == TimeUnit::Millisecond => t.value(),
_ => unreachable!("checked by the \"ensure\" above"),
};
let value = field_column.value(row);
let timestamp = ts_column.value(row);
let sample = Sample { value, timestamp };
timeseries.samples.push(sample);
@@ -579,6 +579,7 @@ mod tests {
use api::prom_store::remote::LabelMatcher;
use api::v1::{ColumnDataType, Row, SemanticType};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use table::table::adapter::DfTableProviderAdapter;

View File

@@ -166,9 +166,9 @@ impl Stream for NumbersStream {
batch = batch.project(projection).unwrap();
}
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
Poll::Ready(Some(Ok(RecordBatch::from_df_record_batch(
self.projected_schema.clone(),
batch,
)))
))))
}
}

View File

@@ -444,14 +444,10 @@ impl Stream for StreamWithMetricWrapper {
}
match result {
Ok(record_batch) => {
let batch_mem_size = record_batch
.columns()
.iter()
.map(|vec_ref| vec_ref.memory_size())
.sum::<usize>();
// we don't record elapsed time here
// since it's calling storage api involving I/O ops
this.metric.record_mem_usage(batch_mem_size);
this.metric
.record_mem_usage(record_batch.buffer_memory_size());
this.metric.record_output(record_batch.num_rows());
Poll::Ready(Some(Ok(record_batch.into_df_record_batch())))
}

View File

@@ -29,7 +29,7 @@ use snafu::prelude::*;
use store_api::data_source::DataSource;
use store_api::storage::{RegionNumber, ScanRequest};
use crate::error::{SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu};
use crate::error::{SchemaConversionSnafu, TableProjectionSnafu};
use crate::metadata::{
FilterPushDownType, TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion,
};
@@ -146,17 +146,14 @@ impl DataSource for MemtableDataSource {
};
let df_recordbatch = df_recordbatch.slice(0, limit);
let recordbatch = RecordBatch::try_from_df_record_batch(
let recordbatch = RecordBatch::from_df_record_batch(
Arc::new(
Schema::try_from(df_recordbatch.schema())
.context(SchemaConversionSnafu)
.map_err(BoxedError::new)?,
),
df_recordbatch,
)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?;
);
Ok(Box::pin(MemtableStream {
schema: recordbatch.schema.clone(),

View File

@@ -18,9 +18,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
use client::DEFAULT_CATALOG_NAME;
use common_query::{Output, OutputData};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::array::{ArrayRef, AsArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::TimestampMillisecondType;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::instance::Instance;
use itertools::Itertools;
use rand::Rng;
@@ -85,12 +84,10 @@ async fn test_create_database_and_insert_query(
OutputData::Stream(s) => {
let batches = common_recordbatch::util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(TimestampMillisecondVector::from_vec(vec![
1655276557000_i64
])) as VectorRef,
*batches[0].column(0)
);
let expected = Arc::new(TimestampMillisecondArray::from_iter_values(vec![
1655276557000_i64,
])) as ArrayRef;
assert_eq!(batches[0].column(0), &expected);
}
_ => unreachable!(),
}
@@ -226,7 +223,7 @@ async fn ensure_data_exists(tables: &[Table], instance: &Arc<Instance>) {
let queried = record_batches
.into_iter()
.flat_map(|rb| {
let array = rb.column(0).to_arrow_array();
let array = rb.column(0);
let array = array.as_primitive::<TimestampMillisecondType>();
array.iter().flatten().map(|x| x as u64).collect::<Vec<_>>()
})

View File

@@ -22,7 +22,9 @@ use common_query::Output;
use common_recordbatch::util;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir;
use datatypes::vectors::{StringVector, TimestampMillisecondVector, UInt64Vector, VectorRef};
use datatypes::arrow::array::{
ArrayRef, AsArray, StringArray, TimestampMillisecondArray, UInt64Array,
};
use frontend::error::{Error, Result};
use frontend::instance::Instance;
use operator::error::Error as OperatorError;
@@ -77,12 +79,10 @@ async fn test_create_database_and_insert_query(instance: Arc<dyn MockInstance>)
OutputData::Stream(s) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(TimestampMillisecondVector::from_vec(vec![
1655276557000_i64
])) as VectorRef,
*batches[0].column(0)
);
let expected = Arc::new(TimestampMillisecondArray::from_iter_values(vec![
1655276557000_i64,
])) as ArrayRef;
assert_eq!(batches[0].column(0), &expected);
}
_ => unreachable!(),
}
@@ -210,7 +210,8 @@ async fn test_show_create_external_table(instance: Arc<dyn MockInstance>) {
// We can't directly test `show create table` by check_output_stream because the location name length depends on the current filesystem.
let record_batches = record_batches.iter().collect::<Vec<_>>();
let column = record_batches[0].column_by_name("Create Table").unwrap();
let actual = column.get(0);
let column = column.as_string::<i32>();
let actual = column.value(0);
let expect = format!(
r#"CREATE EXTERNAL TABLE IF NOT EXISTS "various_type_csv" (
"c_int" BIGINT NULL,
@@ -312,14 +313,11 @@ async fn assert_query_result(instance: &Arc<Instance>, sql: &str, ts: i64, host:
let batches = util::collect(s).await.unwrap();
// let columns = batches[0].df_recordbatch.columns();
assert_eq!(2, batches[0].num_columns());
assert_eq!(
Arc::new(StringVector::from(vec![host])) as VectorRef,
*batches[0].column(0)
);
assert_eq!(
Arc::new(TimestampMillisecondVector::from_vec(vec![ts])) as VectorRef,
*batches[0].column(1)
);
let expected = vec![
Arc::new(StringArray::from_iter_values(vec![host])) as ArrayRef,
Arc::new(TimestampMillisecondArray::from_iter_values(vec![ts])) as ArrayRef,
];
assert_eq!(batches[0].columns(), &expected);
}
_ => unreachable!(),
}
@@ -446,10 +444,8 @@ async fn test_execute_query(instance: Arc<dyn MockInstance>) {
assert_eq!(1, numbers[0].num_columns());
assert_eq!(numbers[0].column(0).len(), 1);
assert_eq!(
Arc::new(UInt64Vector::from_vec(vec![4950_u64])) as VectorRef,
*numbers[0].column(0),
);
let expected = Arc::new(UInt64Array::from_iter_values(vec![4950_u64])) as ArrayRef;
assert_eq!(numbers[0].column(0), &expected);
}
_ => unreachable!(),
}
@@ -2175,7 +2171,8 @@ async fn test_custom_storage(instance: Arc<dyn MockInstance>) {
let record_batches = record_batches.iter().collect::<Vec<_>>();
let column = record_batches[0].column_by_name("Create Table").unwrap();
let actual = column.get(0);
let column = column.as_string::<i32>();
let actual = column.value(0);
let expect = if instance.is_distributed_mode() {
format!(

View File

@@ -34,9 +34,8 @@ use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::{Helper, UInt64Vector};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::UInt64Type;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::future::BoxFuture;
@@ -1189,12 +1188,12 @@ async fn find_region_distribution_by_sql(
let mut distribution = RegionDistribution::new();
for batch in recordbatches.take() {
let datanode_ids: &UInt64Vector =
unsafe { Helper::static_cast(batch.column_by_name("datanode_id").unwrap()) };
let region_ids: &UInt64Vector =
unsafe { Helper::static_cast(batch.column_by_name("region_id").unwrap()) };
let column = batch.column_by_name("datanode_id").unwrap();
let datanode_ids = column.as_primitive::<UInt64Type>();
let column = batch.column_by_name("region_id").unwrap();
let region_ids = column.as_primitive::<UInt64Type>();
for (datanode_id, region_id) in datanode_ids.iter_data().zip(region_ids.iter_data()) {
for (datanode_id, region_id) in datanode_ids.iter().zip(region_ids.iter()) {
let (Some(datanode_id), Some(region_id)) = (datanode_id, region_id) else {
unreachable!();
};
@@ -1231,11 +1230,10 @@ async fn trigger_migration_by_sql(
info!("SQL result:\n {}", recordbatches.pretty_print().unwrap());
let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else {
unreachable!();
};
procedure_id.as_utf8().to_string()
let record_batch = &recordbatches.take()[0];
let column = record_batch.column(0);
let column = column.as_string::<i32>();
column.value(0).to_string()
}
/// Query procedure state by SQL.
@@ -1254,11 +1252,10 @@ async fn query_procedure_by_sql(instance: &Arc<Instance>, pid: &str) -> String {
info!("SQL result:\n {}", recordbatches.pretty_print().unwrap());
let Value::String(state) = recordbatches.take()[0].column(0).get(0) else {
unreachable!();
};
state.as_utf8().to_string()
let record_batch = &recordbatches.take()[0];
let column = record_batch.column(0);
let column = column.as_string::<i32>();
column.value(0).to_string()
}
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {

View File

@@ -307,13 +307,7 @@ pub async fn test_mysql_crud(store_type: StorageType) {
}
});
assert_eq!(json, expected_j);
assert_eq!(
vector,
[1.0f32, 2.0, 3.0]
.iter()
.flat_map(|x| x.to_le_bytes())
.collect::<Vec<u8>>()
);
assert_eq!(vector, "[1,2,3]".as_bytes());
}
let rows = sqlx::query("select i from demo where i=?")