diff --git a/src/common/function/src/aggrs/vector/product.rs b/src/common/function/src/aggrs/vector/product.rs index d22ea96f45..d8f201966a 100644 --- a/src/common/function/src/aggrs/vector/product.rs +++ b/src/common/function/src/aggrs/vector/product.rs @@ -15,7 +15,7 @@ use std::borrow::Cow; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray}; +use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray}; use arrow_schema::{DataType, Field}; use datafusion::logical_expr::{Signature, TypeSignature, Volatility}; use datafusion_common::{Result, ScalarValue}; @@ -63,7 +63,7 @@ impl VectorProduct { } let t = args.schema.field(0).data_type(); - if !matches!(t, DataType::Utf8 | DataType::Binary) { + if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) { return Err(datafusion_common::DataFusionError::Internal(format!( "unexpected input datatype {t} when creating `VEC_PRODUCT`" ))); @@ -91,6 +91,13 @@ impl VectorProduct { .map(|x| x.map(Cow::Owned)) .collect::>>()? } + DataType::LargeUtf8 => { + let arr: &LargeStringArray = values[0].as_string(); + arr.iter() + .filter_map(|x| x.map(|s| parse_veclit_from_strlit(s).map_err(Into::into))) + .map(|x: Result>| x.map(Cow::Owned)) + .collect::>>()? + } DataType::Binary => { let arr: &BinaryArray = values[0].as_binary(); arr.iter() diff --git a/src/common/function/src/aggrs/vector/sum.rs b/src/common/function/src/aggrs/vector/sum.rs index b6ff942791..3f875a8412 100644 --- a/src/common/function/src/aggrs/vector/sum.rs +++ b/src/common/function/src/aggrs/vector/sum.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray}; +use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray}; use arrow_schema::{DataType, Field}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ @@ -63,7 +63,7 @@ impl VectorSum { } let t = args.schema.field(0).data_type(); - if !matches!(t, DataType::Utf8 | DataType::Binary) { + if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) { return Err(datafusion_common::DataFusionError::Internal(format!( "unexpected input datatype {t} when creating `VEC_SUM`" ))); @@ -98,6 +98,21 @@ impl VectorSum { *self.inner(vec_column.len()) += vec_column; } } + DataType::LargeUtf8 => { + let arr: &LargeStringArray = values[0].as_string(); + for s in arr.iter() { + let Some(s) = s else { + if is_update { + self.has_null = true; + self.sum = None; + } + return Ok(()); + }; + let values = parse_veclit_from_strlit(s)?; + let vec_column = DVectorView::from_slice(&values, values.len()); + *self.inner(vec_column.len()) += vec_column; + } + } DataType::Binary => { let arr: &BinaryArray = values[0].as_binary(); for b in arr.iter() { diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 3f1e7c7718..c93961f643 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -287,8 +287,12 @@ mod tests { #[test] fn test_deserialization_compatibility() { - let s = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#; - let v = TableInfoValue::try_from_raw_value(s.as_bytes()).unwrap(); + let old_fmt = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#; + let new_fmt = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#; + + let v = TableInfoValue::try_from_raw_value(old_fmt.as_bytes()).unwrap(); + let new_v = TableInfoValue::try_from_raw_value(new_fmt.as_bytes()).unwrap(); + assert_eq!(v, new_v); assert_eq!(v.table_info.meta.created_on, v.table_info.meta.updated_on); assert!(v.table_info.meta.partition_key_indices.is_empty()); } diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index 40b7d46d1d..97aa299fad 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -16,3 +16,5 @@ pub type BinaryArray = arrow::array::BinaryArray; pub type MutableBinaryArray = arrow::array::BinaryBuilder; pub type StringArray = arrow::array::StringArray; pub type MutableStringArray = arrow::array::StringBuilder; +pub type LargeStringArray = arrow::array::LargeStringArray; +pub type MutableLargeStringArray = arrow::array::LargeStringBuilder; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 74bb181cb3..3f305828a2 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -454,9 +454,8 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => { Self::binary_datatype() } - ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => { - Self::string_datatype() - } + ArrowDataType::Utf8 | ArrowDataType::Utf8View => Self::string_datatype(), + ArrowDataType::LargeUtf8 => Self::large_string_datatype(), ArrowDataType::List(field) => Self::List(ListType::new( ConcreteDataType::from_arrow_type(field.data_type()), )), @@ -518,6 +517,10 @@ impl_new_concrete_type_functions!( ); impl ConcreteDataType { + pub fn large_string_datatype() -> Self { + ConcreteDataType::String(StringType::large_utf8()) + } + pub fn timestamp_second_datatype() -> Self { ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)) } @@ -777,6 +780,14 @@ mod tests { ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8), ConcreteDataType::String(_) )); + // Test LargeUtf8 mapping to large String type + let large_string_type = ConcreteDataType::from_arrow_type(&ArrowDataType::LargeUtf8); + assert!(matches!(large_string_type, ConcreteDataType::String(_))); + if let ConcreteDataType::String(string_type) = &large_string_type { + assert!(string_type.is_large()); + } else { + panic!("Expected a String type"); + } assert_eq!( ConcreteDataType::from_arrow_type(&ArrowDataType::List(Arc::new(Field::new( "item", @@ -791,6 +802,38 @@ mod tests { )); } + #[test] + fn test_large_utf8_round_trip() { + // Test round-trip conversion for LargeUtf8 + let large_utf8_arrow = ArrowDataType::LargeUtf8; + let concrete_type = ConcreteDataType::from_arrow_type(&large_utf8_arrow); + let back_to_arrow = concrete_type.as_arrow_type(); + + assert!(matches!(concrete_type, ConcreteDataType::String(_))); + // Round-trip should preserve the LargeUtf8 type + assert_eq!(large_utf8_arrow, back_to_arrow); + + // Test that Utf8 and LargeUtf8 map to different string variants + let utf8_concrete = ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8); + let large_utf8_concrete = ConcreteDataType::from_arrow_type(&ArrowDataType::LargeUtf8); + + assert!(matches!(utf8_concrete, ConcreteDataType::String(_))); + assert!(matches!(large_utf8_concrete, ConcreteDataType::String(_))); + + // They should have different size types + if let (ConcreteDataType::String(utf8_type), ConcreteDataType::String(large_type)) = + (&utf8_concrete, &large_utf8_concrete) + { + assert!(!utf8_type.is_large()); + assert!(large_type.is_large()); + } else { + panic!("Expected both to be String types"); + } + + // They should be different types + assert_ne!(utf8_concrete, large_utf8_concrete); + } + #[test] fn test_from_arrow_timestamp() { assert_eq!( diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index eeea086245..1c7df86249 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -53,7 +53,7 @@ pub use primitive_type::{ Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, LogicalPrimitiveType, OrdPrimitive, UInt8Type, UInt16Type, UInt32Type, UInt64Type, WrapperType, }; -pub use string_type::StringType; +pub use string_type::{StringSizeType, StringType}; pub use struct_type::{StructField, StructType}; pub use time_type::{ TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, TimeType, diff --git a/src/datatypes/src/types/cast.rs b/src/datatypes/src/types/cast.rs index 2330814f81..d7a27a45e1 100644 --- a/src/datatypes/src/types/cast.rs +++ b/src/datatypes/src/types/cast.rs @@ -104,7 +104,7 @@ pub fn can_cast_type(src_value: &Value, dest_type: &ConcreteDataType) -> bool { (_, Boolean(_)) => src_type.is_numeric() || src_type.is_string(), (Boolean(_), _) => dest_type.is_numeric() || dest_type.is_string(), - // numeric types cast + // numeric and string types cast ( UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_) | Float32(_) | Float64(_) | String(_), diff --git a/src/datatypes/src/types/string_type.rs b/src/datatypes/src/types/string_type.rs index 3b148275d3..61677ead4a 100644 --- a/src/datatypes/src/types/string_type.rs +++ b/src/datatypes/src/types/string_type.rs @@ -19,17 +19,97 @@ use common_base::bytes::StringBytes; use serde::{Deserialize, Serialize}; use crate::data_type::{DataType, DataTypeRef}; -use crate::prelude::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; use crate::value::Value; use crate::vectors::{MutableVector, StringVectorBuilder}; -#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] -pub struct StringType; +/// String size variant to distinguish between UTF8 and LargeUTF8 +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default, +)] +pub enum StringSizeType { + /// Regular UTF8 strings (up to 2GB) + #[default] + Utf8, + /// Large UTF8 strings (up to 2^63 bytes) + LargeUtf8, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)] +pub struct StringType { + #[serde(default)] + size_type: StringSizeType, +} + +/// Custom deserialization to support both old and new formats. +impl<'de> serde::Deserialize<'de> for StringType { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(serde::Deserialize)] + struct Helper { + #[serde(default)] + size_type: StringSizeType, + } + + let opt = Option::::deserialize(deserializer)?; + Ok(match opt { + Some(helper) => Self { + size_type: helper.size_type, + }, + None => Self::default(), + }) + } +} + +impl Default for StringType { + fn default() -> Self { + Self { + size_type: StringSizeType::Utf8, + } + } +} impl StringType { + /// Create a new StringType with default (Utf8) size + pub fn new() -> Self { + Self { + size_type: StringSizeType::Utf8, + } + } + + /// Create a new StringType with specified size + pub fn with_size(size_type: StringSizeType) -> Self { + Self { size_type } + } + + /// Create a StringType for regular UTF8 strings + pub fn utf8() -> Self { + Self::with_size(StringSizeType::Utf8) + } + + /// Create a StringType for large UTF8 strings + pub fn large_utf8() -> Self { + Self::with_size(StringSizeType::LargeUtf8) + } + + /// Get the size type + pub fn size_type(&self) -> StringSizeType { + self.size_type + } + + /// Check if this is a large UTF8 string type + pub fn is_large(&self) -> bool { + matches!(self.size_type, StringSizeType::LargeUtf8) + } + pub fn arc() -> DataTypeRef { - Arc::new(Self) + Arc::new(Self::new()) + } + + pub fn large_arc() -> DataTypeRef { + Arc::new(Self::large_utf8()) } } @@ -47,11 +127,19 @@ impl DataType for StringType { } fn as_arrow_type(&self) -> ArrowDataType { - ArrowDataType::Utf8 + match self.size_type { + StringSizeType::Utf8 => ArrowDataType::Utf8, + StringSizeType::LargeUtf8 => ArrowDataType::LargeUtf8, + } } fn create_mutable_vector(&self, capacity: usize) -> Box { - Box::new(StringVectorBuilder::with_capacity(capacity)) + match self.size_type { + StringSizeType::Utf8 => Box::new(StringVectorBuilder::with_string_capacity(capacity)), + StringSizeType::LargeUtf8 => { + Box::new(StringVectorBuilder::with_large_capacity(capacity)) + } + } } fn try_cast(&self, from: Value) -> Option { diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index ca34aa479b..9427baeedc 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -472,7 +472,13 @@ impl Value { Value::Int64(v) => ScalarValue::Int64(Some(*v)), Value::Float32(v) => ScalarValue::Float32(Some(v.0)), Value::Float64(v) => ScalarValue::Float64(Some(v.0)), - Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())), + Value::String(v) => { + let s = v.as_utf8().to_string(); + match output_type { + ConcreteDataType::String(t) if t.is_large() => ScalarValue::LargeUtf8(Some(s)), + _ => ScalarValue::Utf8(Some(s)), + } + } Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())), Value::Date(v) => ScalarValue::Date32(Some(v.val())), Value::Null => to_null_scalar_value(output_type)?, @@ -606,7 +612,13 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result { ScalarValue::Binary(None) } - ConcreteDataType::String(_) => ScalarValue::Utf8(None), + ConcreteDataType::String(t) => { + if t.is_large() { + ScalarValue::LargeUtf8(None) + } else { + ScalarValue::Utf8(None) + } + } ConcreteDataType::Date(_) => ScalarValue::Date32(None), ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None), ConcreteDataType::Interval(v) => match v { diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 035ebf4ab6..a9eafce154 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -291,7 +291,8 @@ impl Helper { ArrowDataType::Float32 => Arc::new(Float32Vector::try_from_arrow_array(array)?), ArrowDataType::Float64 => Arc::new(Float64Vector::try_from_arrow_array(array)?), ArrowDataType::Utf8 => Arc::new(StringVector::try_from_arrow_array(array)?), - ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => { + ArrowDataType::LargeUtf8 => Arc::new(StringVector::try_from_arrow_array(array)?), + ArrowDataType::Utf8View => { let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8) .context(crate::error::ArrowComputeSnafu)?; Arc::new(StringVector::try_from_arrow_array(array)?) @@ -742,17 +743,17 @@ mod tests { #[test] fn test_large_string_array_into_vector() { let input_vec = vec!["a", "b"]; - let assertion_array = StringArray::from(input_vec.clone()); + let assertion_array = LargeStringArray::from(input_vec.clone()); let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec)); let vector = Helper::try_into_vector(large_string_array).unwrap(); assert_eq!(2, vector.len()); assert_eq!(0, vector.null_count()); - let output_arrow_array: StringArray = vector + let output_arrow_array: LargeStringArray = vector .to_arrow_array() .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() .clone(); assert_eq!(&assertion_array, &output_arrow_array); diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index 7ea0cd53bb..2f6cddb328 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef}; use snafu::ResultExt; -use crate::arrow_array::{MutableStringArray, StringArray}; +use crate::arrow_array::{ + LargeStringArray, MutableLargeStringArray, MutableStringArray, StringArray, +}; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; @@ -26,69 +28,93 @@ use crate::serialize::Serializable; use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; +/// Internal representation for string arrays +#[derive(Debug, PartialEq)] +enum StringArrayData { + String(StringArray), + LargeString(LargeStringArray), +} + /// Vector of strings. #[derive(Debug, PartialEq)] pub struct StringVector { - array: StringArray, + array: StringArrayData, } impl StringVector { pub(crate) fn as_arrow(&self) -> &dyn Array { - &self.array + match &self.array { + StringArrayData::String(array) => array, + StringArrayData::LargeString(array) => array, + } + } + + /// Create a StringVector from a regular StringArray + pub fn from_string_array(array: StringArray) -> Self { + Self { + array: StringArrayData::String(array), + } + } + + /// Create a StringVector from a LargeStringArray + pub fn from_large_string_array(array: LargeStringArray) -> Self { + Self { + array: StringArrayData::LargeString(array), + } + } + + pub fn from_slice>(slice: &[T]) -> Self { + Self::from_string_array(StringArray::from_iter( + slice.iter().map(|s| Some(s.as_ref())), + )) } } impl From for StringVector { fn from(array: StringArray) -> Self { - Self { array } + Self::from_string_array(array) + } +} + +impl From for StringVector { + fn from(array: LargeStringArray) -> Self { + Self::from_large_string_array(array) } } impl From>> for StringVector { fn from(data: Vec>) -> Self { - Self { - array: StringArray::from_iter(data), - } + Self::from_string_array(StringArray::from_iter(data)) } } impl From>> for StringVector { fn from(data: Vec>) -> Self { - Self { - array: StringArray::from_iter(data), - } + Self::from_string_array(StringArray::from_iter(data)) } } impl From<&[Option]> for StringVector { fn from(data: &[Option]) -> Self { - Self { - array: StringArray::from_iter(data), - } + Self::from_string_array(StringArray::from_iter(data)) } } impl From<&[Option<&str>]> for StringVector { fn from(data: &[Option<&str>]) -> Self { - Self { - array: StringArray::from_iter(data), - } + Self::from_string_array(StringArray::from_iter(data)) } } impl From> for StringVector { fn from(data: Vec) -> Self { - Self { - array: StringArray::from_iter(data.into_iter().map(Some)), - } + Self::from_string_array(StringArray::from_iter(data.into_iter().map(Some))) } } impl From> for StringVector { fn from(data: Vec<&str>) -> Self { - Self { - array: StringArray::from_iter(data.into_iter().map(Some)), - } + Self::from_string_array(StringArray::from_iter(data.into_iter().map(Some))) } } @@ -106,67 +132,177 @@ impl Vector for StringVector { } fn len(&self) -> usize { - self.array.len() + match &self.array { + StringArrayData::String(array) => array.len(), + StringArrayData::LargeString(array) => array.len(), + } } fn to_arrow_array(&self) -> ArrayRef { - Arc::new(self.array.clone()) + match &self.array { + StringArrayData::String(array) => Arc::new(array.clone()), + StringArrayData::LargeString(array) => Arc::new(array.clone()), + } } fn to_boxed_arrow_array(&self) -> Box { - Box::new(self.array.clone()) + match &self.array { + StringArrayData::String(array) => Box::new(array.clone()), + StringArrayData::LargeString(array) => Box::new(array.clone()), + } } fn validity(&self) -> Validity { - vectors::impl_validity_for_vector!(self.array) + match &self.array { + StringArrayData::String(array) => vectors::impl_validity_for_vector!(array), + StringArrayData::LargeString(array) => vectors::impl_validity_for_vector!(array), + } } fn memory_size(&self) -> usize { - self.array.get_buffer_memory_size() + match &self.array { + StringArrayData::String(array) => array.get_buffer_memory_size(), + StringArrayData::LargeString(array) => array.get_buffer_memory_size(), + } } fn null_count(&self) -> usize { - self.array.null_count() + match &self.array { + StringArrayData::String(array) => array.null_count(), + StringArrayData::LargeString(array) => array.null_count(), + } } fn is_null(&self, row: usize) -> bool { - self.array.is_null(row) + match &self.array { + StringArrayData::String(array) => array.is_null(row), + StringArrayData::LargeString(array) => array.is_null(row), + } } fn slice(&self, offset: usize, length: usize) -> VectorRef { - Arc::new(Self::from(self.array.slice(offset, length))) + match &self.array { + StringArrayData::String(array) => { + Arc::new(Self::from_string_array(array.slice(offset, length))) + } + StringArrayData::LargeString(array) => { + Arc::new(Self::from_large_string_array(array.slice(offset, length))) + } + } } fn get(&self, index: usize) -> Value { - vectors::impl_get_for_vector!(self.array, index) + match &self.array { + StringArrayData::String(array) => vectors::impl_get_for_vector!(array, index), + StringArrayData::LargeString(array) => vectors::impl_get_for_vector!(array, index), + } } fn get_ref(&self, index: usize) -> ValueRef<'_> { - vectors::impl_get_ref_for_vector!(self.array, index) + match &self.array { + StringArrayData::String(array) => vectors::impl_get_ref_for_vector!(array, index), + StringArrayData::LargeString(array) => vectors::impl_get_ref_for_vector!(array, index), + } + } +} + +pub enum StringIter<'a> { + String(ArrayIter<&'a StringArray>), + LargeString(ArrayIter<&'a LargeStringArray>), +} + +impl<'a> Iterator for StringIter<'a> { + type Item = Option<&'a str>; + + fn next(&mut self) -> Option { + match self { + StringIter::String(iter) => iter.next(), + StringIter::LargeString(iter) => iter.next(), + } } } impl ScalarVector for StringVector { type OwnedItem = String; type RefItem<'a> = &'a str; - type Iter<'a> = ArrayIter<&'a StringArray>; + type Iter<'a> = StringIter<'a>; type Builder = StringVectorBuilder; fn get_data(&self, idx: usize) -> Option> { - if self.array.is_valid(idx) { - Some(self.array.value(idx)) - } else { - None + match &self.array { + StringArrayData::String(array) => { + if array.is_valid(idx) { + Some(array.value(idx)) + } else { + None + } + } + StringArrayData::LargeString(array) => { + if array.is_valid(idx) { + Some(array.value(idx)) + } else { + None + } + } } } fn iter_data(&self) -> Self::Iter<'_> { - self.array.iter() + match &self.array { + StringArrayData::String(array) => StringIter::String(array.iter()), + StringArrayData::LargeString(array) => StringIter::LargeString(array.iter()), + } } } +/// Internal representation for mutable string arrays +enum MutableStringArrayData { + String(MutableStringArray), + LargeString(MutableLargeStringArray), +} + pub struct StringVectorBuilder { - pub mutable_array: MutableStringArray, + mutable_array: MutableStringArrayData, +} + +impl Default for StringVectorBuilder { + fn default() -> Self { + Self::new() + } +} + +impl StringVectorBuilder { + /// Create a builder for regular strings + pub fn new() -> Self { + Self { + mutable_array: MutableStringArrayData::String(MutableStringArray::new()), + } + } + + /// Create a builder for large strings + pub fn new_large() -> Self { + Self { + mutable_array: MutableStringArrayData::LargeString(MutableLargeStringArray::new()), + } + } + + /// Create a builder for regular strings with capacity + pub fn with_string_capacity(capacity: usize) -> Self { + Self { + mutable_array: MutableStringArrayData::String(MutableStringArray::with_capacity( + capacity, 0, + )), + } + } + + /// Create a builder for large strings with capacity + pub fn with_large_capacity(capacity: usize) -> Self { + Self { + mutable_array: MutableStringArrayData::LargeString( + MutableLargeStringArray::with_capacity(capacity, 0), + ), + } + } } impl MutableVector for StringVectorBuilder { @@ -175,7 +311,10 @@ impl MutableVector for StringVectorBuilder { } fn len(&self) -> usize { - self.mutable_array.len() + match &self.mutable_array { + MutableStringArrayData::String(array) => array.len(), + MutableStringArrayData::LargeString(array) => array.len(), + } } fn as_any(&self) -> &dyn Any { @@ -195,8 +334,14 @@ impl MutableVector for StringVectorBuilder { } fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> { match value.try_into_string()? { - Some(v) => self.mutable_array.append_value(v), - None => self.mutable_array.append_null(), + Some(v) => match &mut self.mutable_array { + MutableStringArrayData::String(array) => array.append_value(v), + MutableStringArrayData::LargeString(array) => array.append_value(v), + }, + None => match &mut self.mutable_array { + MutableStringArrayData::String(array) => array.append_null(), + MutableStringArrayData::LargeString(array) => array.append_null(), + }, } Ok(()) } @@ -206,7 +351,10 @@ impl MutableVector for StringVectorBuilder { } fn push_null(&mut self) { - self.mutable_array.append_null() + match &mut self.mutable_array { + MutableStringArrayData::String(array) => array.append_null(), + MutableStringArrayData::LargeString(array) => array.append_null(), + } } } @@ -215,26 +363,44 @@ impl ScalarVectorBuilder for StringVectorBuilder { fn with_capacity(capacity: usize) -> Self { Self { - mutable_array: MutableStringArray::with_capacity(capacity, 0), + mutable_array: MutableStringArrayData::String(MutableStringArray::with_capacity( + capacity, 0, + )), } } fn push(&mut self, value: Option<::RefItem<'_>>) { match value { - Some(v) => self.mutable_array.append_value(v), - None => self.mutable_array.append_null(), + Some(v) => match &mut self.mutable_array { + MutableStringArrayData::String(array) => array.append_value(v), + MutableStringArrayData::LargeString(array) => array.append_value(v), + }, + None => match &mut self.mutable_array { + MutableStringArrayData::String(array) => array.append_null(), + MutableStringArrayData::LargeString(array) => array.append_null(), + }, } } fn finish(&mut self) -> Self::VectorType { - StringVector { - array: self.mutable_array.finish(), + match &mut self.mutable_array { + MutableStringArrayData::String(array) => { + StringVector::from_string_array(array.finish()) + } + MutableStringArrayData::LargeString(array) => { + StringVector::from_large_string_array(array.finish()) + } } } fn finish_cloned(&self) -> Self::VectorType { - StringVector { - array: self.mutable_array.finish_cloned(), + match &self.mutable_array { + MutableStringArrayData::String(array) => { + StringVector::from_string_array(array.finish_cloned()) + } + MutableStringArrayData::LargeString(array) => { + StringVector::from_large_string_array(array.finish_cloned()) + } } } } @@ -248,7 +414,26 @@ impl Serializable for StringVector { } } -vectors::impl_try_from_arrow_array_for_vector!(StringArray, StringVector); +impl StringVector { + pub fn try_from_arrow_array( + array: impl AsRef, + ) -> crate::error::Result { + let array = array.as_ref(); + + if let Some(string_array) = array.as_any().downcast_ref::() { + Ok(StringVector::from_string_array(string_array.clone())) + } else if let Some(large_string_array) = array.as_any().downcast_ref::() { + Ok(StringVector::from_large_string_array( + large_string_array.clone(), + )) + } else { + Err(crate::error::UnsupportedArrowTypeSnafu { + arrow_type: array.data_type().clone(), + } + .build()) + } + } +} #[cfg(test)] mod tests { diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs index acbced8f70..23899cbb05 100644 --- a/src/metric-engine/src/engine/flush.rs +++ b/src/metric-engine/src/engine/flush.rs @@ -127,14 +127,13 @@ mod tests { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3429, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"# ); - // list from storage let storage_entries = mito .all_ssts_from_storage() diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index ed4f965c06..cbd8e83bd3 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -699,10 +699,20 @@ mod test { semantic_type, column_id: 5, }; - let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string(); + let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string(); + let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string(); assert_eq!( MetadataRegion::serialize_column_metadata(&column_metadata), - expected + new_fmt + ); + // Ensure both old and new formats can be deserialized. + assert_eq!( + MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(), + column_metadata + ); + assert_eq!( + MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(), + column_metadata ); let semantic_type = "\"Invalid Column Metadata\""; diff --git a/src/mito-codec/src/index.rs b/src/mito-codec/src/index.rs index c609e50cc7..d98a6d3a51 100644 --- a/src/mito-codec/src/index.rs +++ b/src/mito-codec/src/index.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; use std::sync::Arc; -use datatypes::data_type::ConcreteDataType; use datatypes::value::ValueRef; use memcomparable::Serializer; use snafu::{OptionExt, ResultExt, ensure}; @@ -49,7 +48,7 @@ impl IndexValueCodec { ) -> Result<()> { ensure!(!value.is_null(), IndexEncodeNullSnafu); - if matches!(field.data_type(), ConcreteDataType::String(_)) { + if field.data_type().is_string() { let value = value .try_into_string() .context(FieldTypeMismatchSnafu)? diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2a5d0fbf87..a34e72095d 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -787,9 +787,9 @@ async fn test_list_ssts() { assert_eq!( debug_format, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ); // list from storage diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index c2cd4877fe..b65d9c840d 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -923,6 +923,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1748); + assert_eq!(manifest_size, 1764); } } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 2425ccb5d0..a2f78a3ed0 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicUsize; use common_telemetry::warn; -use datatypes::arrow::array::{Array, StringArray}; +use datatypes::arrow::array::{Array, LargeStringArray, StringArray}; use datatypes::arrow::datatypes::DataType; use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::{FulltextAnalyzer, FulltextBackend}; @@ -321,12 +321,34 @@ impl SingleCreator { if let Some(column_array) = batch.column_by_name(&self.column_name) { // Convert Arrow array to string array. // TODO(yingwen): Use Utf8View later if possible. - let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8) - .context(ComputeArrowSnafu)?; - let string_array = array.as_any().downcast_ref::().unwrap(); - for text_opt in string_array.iter() { - let text = text_opt.unwrap_or_default(); - self.inner.push_text(text).await?; + match column_array.data_type() { + DataType::Utf8 => { + let string_array = column_array.as_any().downcast_ref::().unwrap(); + for text_opt in string_array.iter() { + let text = text_opt.unwrap_or_default(); + self.inner.push_text(text).await?; + } + } + DataType::LargeUtf8 => { + let large_string_array = column_array + .as_any() + .downcast_ref::() + .unwrap(); + for text_opt in large_string_array.iter() { + let text = text_opt.unwrap_or_default(); + self.inner.push_text(text).await?; + } + } + _ => { + // For other types, cast to Utf8 as before + let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8) + .context(ComputeArrowSnafu)?; + let string_array = array.as_any().downcast_ref::().unwrap(); + for text_opt in string_array.iter() { + let text = text_opt.unwrap_or_default(); + self.inner.push_text(text).await?; + } + } } } else { // If the column is not found in the batch, push empty text. diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index 2dc30bb364..bcf1d8694c 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -688,7 +688,7 @@ impl FlatConvertFormat { let values_array = values_vector.to_arrow_array(); // Only creates dictionary array for string types, otherwise take values by keys - if matches!(column_type, ConcreteDataType::String(_)) { + if column_type.is_string() { // Creates dictionary array using the same keys for string types // Note that the dictionary values may have nulls. let dict_array = DictionaryArray::new(keys.clone(), values_array); diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs index 4705c01387..cf92741bea 100644 --- a/src/servers/src/mysql/helper.rs +++ b/src/servers/src/mysql/helper.rs @@ -201,9 +201,14 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result value::to_null_scalar_value(t).context(error::ConvertScalarValueSnafu), ValueInner::Bytes(b) => match t { - ConcreteDataType::String(_) => Ok(ScalarValue::Utf8(Some( - String::from_utf8_lossy(b).to_string(), - ))), + ConcreteDataType::String(t) => { + let s = String::from_utf8_lossy(b).to_string(); + if t.is_large() { + Ok(ScalarValue::LargeUtf8(Some(s))) + } else { + Ok(ScalarValue::Utf8(Some(s))) + } + } ConcreteDataType::Binary(_) => Ok(ScalarValue::Binary(Some(b.to_vec()))), ConcreteDataType::Timestamp(ts_type) => covert_bytes_to_timestamp(b, ts_type), _ => error::PreparedStmtTypeMismatchSnafu { diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 65faf5041f..e78980f1d8 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -241,7 +241,7 @@ fn encode_array( } } } - ConcreteDataType::String(_) => { + &ConcreteDataType::String(_) => { let array = value_list .items() .iter() @@ -687,7 +687,13 @@ pub(super) fn parameters_to_scalar_values( let data = portal.parameter::(idx, &client_type)?; if let Some(server_type) = &server_type { match server_type { - ConcreteDataType::String(_) => ScalarValue::Utf8(data), + ConcreteDataType::String(t) => { + if t.is_large() { + ScalarValue::LargeUtf8(data) + } else { + ScalarValue::Utf8(data) + } + } _ => { return Err(invalid_parameter_error( "invalid_parameter_type", @@ -969,8 +975,13 @@ pub(super) fn parameters_to_scalar_values( let data = portal.parameter::>(idx, &client_type)?; if let Some(server_type) = &server_type { match server_type { - ConcreteDataType::String(_) => { - ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string())) + ConcreteDataType::String(t) => { + let s = data.map(|d| String::from_utf8_lossy(&d).to_string()); + if t.is_large() { + ScalarValue::LargeUtf8(s) + } else { + ScalarValue::Utf8(s) + } } ConcreteDataType::Binary(_) => ScalarValue::Binary(data), _ => { diff --git a/tests-fuzz/src/generator/create_expr.rs b/tests-fuzz/src/generator/create_expr.rs index e9c43955eb..c216e2c287 100644 --- a/tests-fuzz/src/generator/create_expr.rs +++ b/tests-fuzz/src/generator/create_expr.rs @@ -511,7 +511,7 @@ mod tests { .unwrap(); let logical_table_serialized = serde_json::to_string(&logical_table_expr).unwrap(); - let logical_table_expected = r#"{"table_name":{"value":"impedit","quote_style":null},"columns":[{"name":{"value":"ts","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"val","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"totam","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"cumque","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"natus","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"molestias","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"qui","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]}],"if_not_exists":false,"partition":null,"engine":"metric","options":{"on_physical_table":{"String":"expedita"}},"primary_keys":[4,2,3,6,5]}"#; + let logical_table_expected = r#"{"table_name":{"value":"impedit","quote_style":null},"columns":[{"name":{"value":"ts","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"val","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"totam","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"cumque","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"natus","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"molestias","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"qui","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]}],"if_not_exists":false,"partition":null,"engine":"metric","options":{"on_physical_table":{"String":"expedita"}},"primary_keys":[4,2,3,6,5]}"#; assert_eq!(logical_table_expected, logical_table_serialized); } diff --git a/tests/cases/standalone/common/aggregate/string_agg.result b/tests/cases/standalone/common/aggregate/string_agg.result index 851d0d7744..a4e98dee18 100644 --- a/tests/cases/standalone/common/aggregate/string_agg.result +++ b/tests/cases/standalone/common/aggregate/string_agg.result @@ -33,12 +33,23 @@ Affected Rows: 9 SELECT g, STRING_AGG(x,'|') FROM strings GROUP BY g ORDER BY g; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1 ++---+---------------------------------+ +| g | string_agg(strings.x,Utf8("|")) | ++---+---------------------------------+ +| 1 | a|b | +| 2 | i|j | +| 3 | p | +| 4 | x|y|z | ++---+---------------------------------+ -- test agg on empty set SELECT STRING_AGG(x,',') FROM strings WHERE g > 100; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0 ++---------------------------------+ +| string_agg(strings.x,Utf8(",")) | ++---------------------------------+ +| | ++---------------------------------+ -- string_agg can be used instead of group_concat SELECT string_agg('a', ','); @@ -59,35 +70,75 @@ SELECT string_agg('a', ','); SELECT g, string_agg(x, ',') FROM strings GROUP BY g ORDER BY g; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1 ++---+---------------------------------+ +| g | string_agg(strings.x,Utf8(",")) | ++---+---------------------------------+ +| 1 | a,b | +| 2 | i,j | +| 3 | p | +| 4 | x,y,z | ++---+---------------------------------+ -- Test ORDER BY -- Single group SELECT STRING_AGG(x, '' ORDER BY x ASC), STRING_AGG(x, '|' ORDER BY x ASC) FROM strings; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0 ++--------------------------------------------------------------------+---------------------------------------------------------------------+ +| string_agg(strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x ASC NULLS LAST] | ++--------------------------------------------------------------------+---------------------------------------------------------------------+ +| abijpxyz | a|b|i|j|p|x|y|z | ++--------------------------------------------------------------------+---------------------------------------------------------------------+ SELECT STRING_AGG(x, '' ORDER BY x DESC), STRING_AGG(x,'|' ORDER BY x DESC) FROM strings; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0 ++----------------------------------------------------------------------+-----------------------------------------------------------------------+ +| string_agg(strings.x,Utf8("")) ORDER BY [strings.x DESC NULLS FIRST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x DESC NULLS FIRST] | ++----------------------------------------------------------------------+-----------------------------------------------------------------------+ +| zyxpjiba | z|y|x|p|j|i|b|a | ++----------------------------------------------------------------------+-----------------------------------------------------------------------+ -- Grouped with ORDER BY SELECT g, STRING_AGG(x, '' ORDER BY x ASC), STRING_AGG(x, '|' ORDER BY x ASC) FROM strings GROUP BY g ORDER BY g; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1 ++---+--------------------------------------------------------------------+---------------------------------------------------------------------+ +| g | string_agg(strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x ASC NULLS LAST] | ++---+--------------------------------------------------------------------+---------------------------------------------------------------------+ +| 1 | ab | a|b | +| 2 | ij | i|j | +| 3 | p | p | +| 4 | xyz | x|y|z | ++---+--------------------------------------------------------------------+---------------------------------------------------------------------+ SELECT g, STRING_AGG(x, '' ORDER BY x DESC), STRING_AGG(x,'|' ORDER BY x DESC) FROM strings GROUP BY g ORDER BY g; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1 ++---+----------------------------------------------------------------------+-----------------------------------------------------------------------+ +| g | string_agg(strings.x,Utf8("")) ORDER BY [strings.x DESC NULLS FIRST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x DESC NULLS FIRST] | ++---+----------------------------------------------------------------------+-----------------------------------------------------------------------+ +| 1 | ba | b|a | +| 2 | ji | j|i | +| 3 | p | p | +| 4 | zyx | z|y|x | ++---+----------------------------------------------------------------------+-----------------------------------------------------------------------+ -- Test with DISTINCT SELECT STRING_AGG(DISTINCT x, '' ORDER BY x), STRING_AGG(DISTINCT x, '|' ORDER BY x) FROM strings; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0 ++-----------------------------------------------------------------------------+------------------------------------------------------------------------------+ +| string_agg(DISTINCT strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | string_agg(DISTINCT strings.x,Utf8("|")) ORDER BY [strings.x ASC NULLS LAST] | ++-----------------------------------------------------------------------------+------------------------------------------------------------------------------+ +| abijpxyz | a|b|i|j|p|x|y|z | ++-----------------------------------------------------------------------------+------------------------------------------------------------------------------+ SELECT g, STRING_AGG(DISTINCT x, '' ORDER BY x) FROM strings GROUP BY g ORDER BY g; -Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1 ++---+-----------------------------------------------------------------------------+ +| g | string_agg(DISTINCT strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | ++---+-----------------------------------------------------------------------------+ +| 1 | ab | +| 2 | ij | +| 3 | p | +| 4 | xyz | ++---+-----------------------------------------------------------------------------+ -- cleanup DROP TABLE strings; diff --git a/tests/cases/standalone/common/create/create_metric_table.result b/tests/cases/standalone/common/create/create_metric_table.result index 2895105a9e..55f1525a53 100644 --- a/tests/cases/standalone/common/create/create_metric_table.result +++ b/tests/cases/standalone/common/create/create_metric_table.result @@ -36,12 +36,12 @@ Affected Rows: 0 -- create logical table with different data type on field column CREATE TABLE t3 (ts timestamp time index, val string, host string, primary key (host)) engine=metric with ("on_physical_table" = "phy"); -Error: 1004(InvalidArguments), Column type mismatch. Expect Float64(Float64Type), got String(StringType) +Error: 1004(InvalidArguments), Column type mismatch. Expect Float64(Float64Type), got String(StringType { size_type: Utf8 }) -- create logical table with different data type on tag column CREATE TABLE t4 (ts timestamp time index, val double, host double, primary key (host)) engine=metric with ("on_physical_table" = "phy"); -Error: 1004(InvalidArguments), Column type mismatch. Expect String(StringType), got Float64(Float64Type) +Error: 1004(InvalidArguments), Column type mismatch. Expect String(StringType { size_type: Utf8 }), got Float64(Float64Type) -- create logical table with different column name on field column CREATE TABLE t5 (ts timestamp time index, valval double, host string primary key) engine = metric with ("on_physical_table" = "phy"); diff --git a/tests/cases/standalone/common/insert/insert_invalid.result b/tests/cases/standalone/common/insert/insert_invalid.result index af74e72778..88123a3af6 100644 --- a/tests/cases/standalone/common/insert/insert_invalid.result +++ b/tests/cases/standalone/common/insert/insert_invalid.result @@ -8,7 +8,7 @@ Affected Rows: 1 INSERT INTO strings VALUES (3, 4); -Error: 2000(InvalidSyntax), Failed to parse value: Fail to parse number 3, invalid column type: String(StringType) +Error: 2000(InvalidSyntax), Failed to parse value: Fail to parse number 3, invalid column type: String(StringType { size_type: Utf8 }) SELECT * FROM strings WHERE i = 'â‚(';