mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
feat: supports large string (#7097)
* feat: supports large string Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: add doc for extract_string_vector_values Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: refactor by cr comments Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: changes by cr comments Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * refactor: extract_string_vector_values Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * feat: remove large string type and refactor string vector Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: revert some changes Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * feat: adds large string type Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: impl default for StringSizeType Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * fix: tests and test compatibility Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * test: update sqlness tests Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: remove panic Signed-off-by: Dennis Zhuang <killme2008@gmail.com> --------- Signed-off-by: Dennis Zhuang <killme2008@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -15,7 +15,7 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray};
|
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray};
|
||||||
use arrow_schema::{DataType, Field};
|
use arrow_schema::{DataType, Field};
|
||||||
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
|
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
|
||||||
use datafusion_common::{Result, ScalarValue};
|
use datafusion_common::{Result, ScalarValue};
|
||||||
@@ -63,7 +63,7 @@ impl VectorProduct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let t = args.schema.field(0).data_type();
|
let t = args.schema.field(0).data_type();
|
||||||
if !matches!(t, DataType::Utf8 | DataType::Binary) {
|
if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) {
|
||||||
return Err(datafusion_common::DataFusionError::Internal(format!(
|
return Err(datafusion_common::DataFusionError::Internal(format!(
|
||||||
"unexpected input datatype {t} when creating `VEC_PRODUCT`"
|
"unexpected input datatype {t} when creating `VEC_PRODUCT`"
|
||||||
)));
|
)));
|
||||||
@@ -91,6 +91,13 @@ impl VectorProduct {
|
|||||||
.map(|x| x.map(Cow::Owned))
|
.map(|x| x.map(Cow::Owned))
|
||||||
.collect::<Result<Vec<_>>>()?
|
.collect::<Result<Vec<_>>>()?
|
||||||
}
|
}
|
||||||
|
DataType::LargeUtf8 => {
|
||||||
|
let arr: &LargeStringArray = values[0].as_string();
|
||||||
|
arr.iter()
|
||||||
|
.filter_map(|x| x.map(|s| parse_veclit_from_strlit(s).map_err(Into::into)))
|
||||||
|
.map(|x: Result<Vec<f32>>| x.map(Cow::Owned))
|
||||||
|
.collect::<Result<Vec<_>>>()?
|
||||||
|
}
|
||||||
DataType::Binary => {
|
DataType::Binary => {
|
||||||
let arr: &BinaryArray = values[0].as_binary();
|
let arr: &BinaryArray = values[0].as_binary();
|
||||||
arr.iter()
|
arr.iter()
|
||||||
|
|||||||
@@ -14,7 +14,7 @@
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, StringArray};
|
use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, LargeStringArray, StringArray};
|
||||||
use arrow_schema::{DataType, Field};
|
use arrow_schema::{DataType, Field};
|
||||||
use datafusion_common::{Result, ScalarValue};
|
use datafusion_common::{Result, ScalarValue};
|
||||||
use datafusion_expr::{
|
use datafusion_expr::{
|
||||||
@@ -63,7 +63,7 @@ impl VectorSum {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let t = args.schema.field(0).data_type();
|
let t = args.schema.field(0).data_type();
|
||||||
if !matches!(t, DataType::Utf8 | DataType::Binary) {
|
if !matches!(t, DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary) {
|
||||||
return Err(datafusion_common::DataFusionError::Internal(format!(
|
return Err(datafusion_common::DataFusionError::Internal(format!(
|
||||||
"unexpected input datatype {t} when creating `VEC_SUM`"
|
"unexpected input datatype {t} when creating `VEC_SUM`"
|
||||||
)));
|
)));
|
||||||
@@ -98,6 +98,21 @@ impl VectorSum {
|
|||||||
*self.inner(vec_column.len()) += vec_column;
|
*self.inner(vec_column.len()) += vec_column;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
DataType::LargeUtf8 => {
|
||||||
|
let arr: &LargeStringArray = values[0].as_string();
|
||||||
|
for s in arr.iter() {
|
||||||
|
let Some(s) = s else {
|
||||||
|
if is_update {
|
||||||
|
self.has_null = true;
|
||||||
|
self.sum = None;
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
let values = parse_veclit_from_strlit(s)?;
|
||||||
|
let vec_column = DVectorView::from_slice(&values, values.len());
|
||||||
|
*self.inner(vec_column.len()) += vec_column;
|
||||||
|
}
|
||||||
|
}
|
||||||
DataType::Binary => {
|
DataType::Binary => {
|
||||||
let arr: &BinaryArray = values[0].as_binary();
|
let arr: &BinaryArray = values[0].as_binary();
|
||||||
for b in arr.iter() {
|
for b in arr.iter() {
|
||||||
|
|||||||
@@ -287,8 +287,12 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_deserialization_compatibility() {
|
fn test_deserialization_compatibility() {
|
||||||
let s = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
|
let old_fmt = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
|
||||||
let v = TableInfoValue::try_from_raw_value(s.as_bytes()).unwrap();
|
let new_fmt = r#"{"version":1,"table_info":{"ident":{"table_id":8714,"version":0},"name":"go_gc_duration_seconds","desc":"Created on insertion","catalog_name":"e87lehzy63d4cloud_docs_test","schema_name":"public","meta":{"schema":{"column_schemas":[{"name":"instance","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"job","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"quantile","data_type":{"String":{"size_type":"Utf8"}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"greptime_timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"greptime_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":3,"version":0},"primary_key_indices":[0,1,2],"value_indices":[],"engine":"mito","next_column_id":5,"region_numbers":[],"engine_options":{},"options":{"write_buffer_size":null,"ttl":null,"extra_options":{}},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
|
||||||
|
|
||||||
|
let v = TableInfoValue::try_from_raw_value(old_fmt.as_bytes()).unwrap();
|
||||||
|
let new_v = TableInfoValue::try_from_raw_value(new_fmt.as_bytes()).unwrap();
|
||||||
|
assert_eq!(v, new_v);
|
||||||
assert_eq!(v.table_info.meta.created_on, v.table_info.meta.updated_on);
|
assert_eq!(v.table_info.meta.created_on, v.table_info.meta.updated_on);
|
||||||
assert!(v.table_info.meta.partition_key_indices.is_empty());
|
assert!(v.table_info.meta.partition_key_indices.is_empty());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,3 +16,5 @@ pub type BinaryArray = arrow::array::BinaryArray;
|
|||||||
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
|
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
|
||||||
pub type StringArray = arrow::array::StringArray;
|
pub type StringArray = arrow::array::StringArray;
|
||||||
pub type MutableStringArray = arrow::array::StringBuilder;
|
pub type MutableStringArray = arrow::array::StringBuilder;
|
||||||
|
pub type LargeStringArray = arrow::array::LargeStringArray;
|
||||||
|
pub type MutableLargeStringArray = arrow::array::LargeStringBuilder;
|
||||||
|
|||||||
@@ -454,9 +454,8 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
|
|||||||
ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => {
|
ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => {
|
||||||
Self::binary_datatype()
|
Self::binary_datatype()
|
||||||
}
|
}
|
||||||
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => {
|
ArrowDataType::Utf8 | ArrowDataType::Utf8View => Self::string_datatype(),
|
||||||
Self::string_datatype()
|
ArrowDataType::LargeUtf8 => Self::large_string_datatype(),
|
||||||
}
|
|
||||||
ArrowDataType::List(field) => Self::List(ListType::new(
|
ArrowDataType::List(field) => Self::List(ListType::new(
|
||||||
ConcreteDataType::from_arrow_type(field.data_type()),
|
ConcreteDataType::from_arrow_type(field.data_type()),
|
||||||
)),
|
)),
|
||||||
@@ -518,6 +517,10 @@ impl_new_concrete_type_functions!(
|
|||||||
);
|
);
|
||||||
|
|
||||||
impl ConcreteDataType {
|
impl ConcreteDataType {
|
||||||
|
pub fn large_string_datatype() -> Self {
|
||||||
|
ConcreteDataType::String(StringType::large_utf8())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn timestamp_second_datatype() -> Self {
|
pub fn timestamp_second_datatype() -> Self {
|
||||||
ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType))
|
ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType))
|
||||||
}
|
}
|
||||||
@@ -777,6 +780,14 @@ mod tests {
|
|||||||
ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8),
|
ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8),
|
||||||
ConcreteDataType::String(_)
|
ConcreteDataType::String(_)
|
||||||
));
|
));
|
||||||
|
// Test LargeUtf8 mapping to large String type
|
||||||
|
let large_string_type = ConcreteDataType::from_arrow_type(&ArrowDataType::LargeUtf8);
|
||||||
|
assert!(matches!(large_string_type, ConcreteDataType::String(_)));
|
||||||
|
if let ConcreteDataType::String(string_type) = &large_string_type {
|
||||||
|
assert!(string_type.is_large());
|
||||||
|
} else {
|
||||||
|
panic!("Expected a String type");
|
||||||
|
}
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
ConcreteDataType::from_arrow_type(&ArrowDataType::List(Arc::new(Field::new(
|
ConcreteDataType::from_arrow_type(&ArrowDataType::List(Arc::new(Field::new(
|
||||||
"item",
|
"item",
|
||||||
@@ -791,6 +802,38 @@ mod tests {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_large_utf8_round_trip() {
|
||||||
|
// Test round-trip conversion for LargeUtf8
|
||||||
|
let large_utf8_arrow = ArrowDataType::LargeUtf8;
|
||||||
|
let concrete_type = ConcreteDataType::from_arrow_type(&large_utf8_arrow);
|
||||||
|
let back_to_arrow = concrete_type.as_arrow_type();
|
||||||
|
|
||||||
|
assert!(matches!(concrete_type, ConcreteDataType::String(_)));
|
||||||
|
// Round-trip should preserve the LargeUtf8 type
|
||||||
|
assert_eq!(large_utf8_arrow, back_to_arrow);
|
||||||
|
|
||||||
|
// Test that Utf8 and LargeUtf8 map to different string variants
|
||||||
|
let utf8_concrete = ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8);
|
||||||
|
let large_utf8_concrete = ConcreteDataType::from_arrow_type(&ArrowDataType::LargeUtf8);
|
||||||
|
|
||||||
|
assert!(matches!(utf8_concrete, ConcreteDataType::String(_)));
|
||||||
|
assert!(matches!(large_utf8_concrete, ConcreteDataType::String(_)));
|
||||||
|
|
||||||
|
// They should have different size types
|
||||||
|
if let (ConcreteDataType::String(utf8_type), ConcreteDataType::String(large_type)) =
|
||||||
|
(&utf8_concrete, &large_utf8_concrete)
|
||||||
|
{
|
||||||
|
assert!(!utf8_type.is_large());
|
||||||
|
assert!(large_type.is_large());
|
||||||
|
} else {
|
||||||
|
panic!("Expected both to be String types");
|
||||||
|
}
|
||||||
|
|
||||||
|
// They should be different types
|
||||||
|
assert_ne!(utf8_concrete, large_utf8_concrete);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_from_arrow_timestamp() {
|
fn test_from_arrow_timestamp() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ pub use primitive_type::{
|
|||||||
Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, LogicalPrimitiveType,
|
Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, LogicalPrimitiveType,
|
||||||
OrdPrimitive, UInt8Type, UInt16Type, UInt32Type, UInt64Type, WrapperType,
|
OrdPrimitive, UInt8Type, UInt16Type, UInt32Type, UInt64Type, WrapperType,
|
||||||
};
|
};
|
||||||
pub use string_type::StringType;
|
pub use string_type::{StringSizeType, StringType};
|
||||||
pub use struct_type::{StructField, StructType};
|
pub use struct_type::{StructField, StructType};
|
||||||
pub use time_type::{
|
pub use time_type::{
|
||||||
TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, TimeType,
|
TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, TimeType,
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ pub fn can_cast_type(src_value: &Value, dest_type: &ConcreteDataType) -> bool {
|
|||||||
(_, Boolean(_)) => src_type.is_numeric() || src_type.is_string(),
|
(_, Boolean(_)) => src_type.is_numeric() || src_type.is_string(),
|
||||||
(Boolean(_), _) => dest_type.is_numeric() || dest_type.is_string(),
|
(Boolean(_), _) => dest_type.is_numeric() || dest_type.is_string(),
|
||||||
|
|
||||||
// numeric types cast
|
// numeric and string types cast
|
||||||
(
|
(
|
||||||
UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_)
|
UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_)
|
||||||
| Float32(_) | Float64(_) | String(_),
|
| Float32(_) | Float64(_) | String(_),
|
||||||
|
|||||||
@@ -19,17 +19,97 @@ use common_base::bytes::StringBytes;
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::data_type::{DataType, DataTypeRef};
|
use crate::data_type::{DataType, DataTypeRef};
|
||||||
use crate::prelude::ScalarVectorBuilder;
|
|
||||||
use crate::type_id::LogicalTypeId;
|
use crate::type_id::LogicalTypeId;
|
||||||
use crate::value::Value;
|
use crate::value::Value;
|
||||||
use crate::vectors::{MutableVector, StringVectorBuilder};
|
use crate::vectors::{MutableVector, StringVectorBuilder};
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
|
/// String size variant to distinguish between UTF8 and LargeUTF8
|
||||||
pub struct StringType;
|
#[derive(
|
||||||
|
Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default,
|
||||||
|
)]
|
||||||
|
pub enum StringSizeType {
|
||||||
|
/// Regular UTF8 strings (up to 2GB)
|
||||||
|
#[default]
|
||||||
|
Utf8,
|
||||||
|
/// Large UTF8 strings (up to 2^63 bytes)
|
||||||
|
LargeUtf8,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
|
||||||
|
pub struct StringType {
|
||||||
|
#[serde(default)]
|
||||||
|
size_type: StringSizeType,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Custom deserialization to support both old and new formats.
|
||||||
|
impl<'de> serde::Deserialize<'de> for StringType {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::Deserializer<'de>,
|
||||||
|
{
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
struct Helper {
|
||||||
|
#[serde(default)]
|
||||||
|
size_type: StringSizeType,
|
||||||
|
}
|
||||||
|
|
||||||
|
let opt = Option::<Helper>::deserialize(deserializer)?;
|
||||||
|
Ok(match opt {
|
||||||
|
Some(helper) => Self {
|
||||||
|
size_type: helper.size_type,
|
||||||
|
},
|
||||||
|
None => Self::default(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for StringType {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
size_type: StringSizeType::Utf8,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl StringType {
|
impl StringType {
|
||||||
|
/// Create a new StringType with default (Utf8) size
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
size_type: StringSizeType::Utf8,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new StringType with specified size
|
||||||
|
pub fn with_size(size_type: StringSizeType) -> Self {
|
||||||
|
Self { size_type }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a StringType for regular UTF8 strings
|
||||||
|
pub fn utf8() -> Self {
|
||||||
|
Self::with_size(StringSizeType::Utf8)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a StringType for large UTF8 strings
|
||||||
|
pub fn large_utf8() -> Self {
|
||||||
|
Self::with_size(StringSizeType::LargeUtf8)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the size type
|
||||||
|
pub fn size_type(&self) -> StringSizeType {
|
||||||
|
self.size_type
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if this is a large UTF8 string type
|
||||||
|
pub fn is_large(&self) -> bool {
|
||||||
|
matches!(self.size_type, StringSizeType::LargeUtf8)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn arc() -> DataTypeRef {
|
pub fn arc() -> DataTypeRef {
|
||||||
Arc::new(Self)
|
Arc::new(Self::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn large_arc() -> DataTypeRef {
|
||||||
|
Arc::new(Self::large_utf8())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,11 +127,19 @@ impl DataType for StringType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn as_arrow_type(&self) -> ArrowDataType {
|
fn as_arrow_type(&self) -> ArrowDataType {
|
||||||
ArrowDataType::Utf8
|
match self.size_type {
|
||||||
|
StringSizeType::Utf8 => ArrowDataType::Utf8,
|
||||||
|
StringSizeType::LargeUtf8 => ArrowDataType::LargeUtf8,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
|
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
|
||||||
Box::new(StringVectorBuilder::with_capacity(capacity))
|
match self.size_type {
|
||||||
|
StringSizeType::Utf8 => Box::new(StringVectorBuilder::with_string_capacity(capacity)),
|
||||||
|
StringSizeType::LargeUtf8 => {
|
||||||
|
Box::new(StringVectorBuilder::with_large_capacity(capacity))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_cast(&self, from: Value) -> Option<Value> {
|
fn try_cast(&self, from: Value) -> Option<Value> {
|
||||||
|
|||||||
@@ -472,7 +472,13 @@ impl Value {
|
|||||||
Value::Int64(v) => ScalarValue::Int64(Some(*v)),
|
Value::Int64(v) => ScalarValue::Int64(Some(*v)),
|
||||||
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
|
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
|
||||||
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
|
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
|
||||||
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
|
Value::String(v) => {
|
||||||
|
let s = v.as_utf8().to_string();
|
||||||
|
match output_type {
|
||||||
|
ConcreteDataType::String(t) if t.is_large() => ScalarValue::LargeUtf8(Some(s)),
|
||||||
|
_ => ScalarValue::Utf8(Some(s)),
|
||||||
|
}
|
||||||
|
}
|
||||||
Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
|
Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
|
||||||
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
|
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
|
||||||
Value::Null => to_null_scalar_value(output_type)?,
|
Value::Null => to_null_scalar_value(output_type)?,
|
||||||
@@ -606,7 +612,13 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
|
|||||||
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => {
|
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => {
|
||||||
ScalarValue::Binary(None)
|
ScalarValue::Binary(None)
|
||||||
}
|
}
|
||||||
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
|
ConcreteDataType::String(t) => {
|
||||||
|
if t.is_large() {
|
||||||
|
ScalarValue::LargeUtf8(None)
|
||||||
|
} else {
|
||||||
|
ScalarValue::Utf8(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
|
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
|
||||||
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None),
|
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None),
|
||||||
ConcreteDataType::Interval(v) => match v {
|
ConcreteDataType::Interval(v) => match v {
|
||||||
|
|||||||
@@ -291,7 +291,8 @@ impl Helper {
|
|||||||
ArrowDataType::Float32 => Arc::new(Float32Vector::try_from_arrow_array(array)?),
|
ArrowDataType::Float32 => Arc::new(Float32Vector::try_from_arrow_array(array)?),
|
||||||
ArrowDataType::Float64 => Arc::new(Float64Vector::try_from_arrow_array(array)?),
|
ArrowDataType::Float64 => Arc::new(Float64Vector::try_from_arrow_array(array)?),
|
||||||
ArrowDataType::Utf8 => Arc::new(StringVector::try_from_arrow_array(array)?),
|
ArrowDataType::Utf8 => Arc::new(StringVector::try_from_arrow_array(array)?),
|
||||||
ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => {
|
ArrowDataType::LargeUtf8 => Arc::new(StringVector::try_from_arrow_array(array)?),
|
||||||
|
ArrowDataType::Utf8View => {
|
||||||
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
|
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
|
||||||
.context(crate::error::ArrowComputeSnafu)?;
|
.context(crate::error::ArrowComputeSnafu)?;
|
||||||
Arc::new(StringVector::try_from_arrow_array(array)?)
|
Arc::new(StringVector::try_from_arrow_array(array)?)
|
||||||
@@ -742,17 +743,17 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_large_string_array_into_vector() {
|
fn test_large_string_array_into_vector() {
|
||||||
let input_vec = vec!["a", "b"];
|
let input_vec = vec!["a", "b"];
|
||||||
let assertion_array = StringArray::from(input_vec.clone());
|
let assertion_array = LargeStringArray::from(input_vec.clone());
|
||||||
|
|
||||||
let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
|
let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
|
||||||
let vector = Helper::try_into_vector(large_string_array).unwrap();
|
let vector = Helper::try_into_vector(large_string_array).unwrap();
|
||||||
assert_eq!(2, vector.len());
|
assert_eq!(2, vector.len());
|
||||||
assert_eq!(0, vector.null_count());
|
assert_eq!(0, vector.null_count());
|
||||||
|
|
||||||
let output_arrow_array: StringArray = vector
|
let output_arrow_array: LargeStringArray = vector
|
||||||
.to_arrow_array()
|
.to_arrow_array()
|
||||||
.as_any()
|
.as_any()
|
||||||
.downcast_ref::<StringArray>()
|
.downcast_ref::<LargeStringArray>()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.clone();
|
.clone();
|
||||||
assert_eq!(&assertion_array, &output_arrow_array);
|
assert_eq!(&assertion_array, &output_arrow_array);
|
||||||
|
|||||||
@@ -18,7 +18,9 @@ use std::sync::Arc;
|
|||||||
use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef};
|
use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef};
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
use crate::arrow_array::{MutableStringArray, StringArray};
|
use crate::arrow_array::{
|
||||||
|
LargeStringArray, MutableLargeStringArray, MutableStringArray, StringArray,
|
||||||
|
};
|
||||||
use crate::data_type::ConcreteDataType;
|
use crate::data_type::ConcreteDataType;
|
||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result};
|
||||||
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||||
@@ -26,69 +28,93 @@ use crate::serialize::Serializable;
|
|||||||
use crate::value::{Value, ValueRef};
|
use crate::value::{Value, ValueRef};
|
||||||
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
|
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
|
||||||
|
|
||||||
|
/// Internal representation for string arrays
|
||||||
|
#[derive(Debug, PartialEq)]
|
||||||
|
enum StringArrayData {
|
||||||
|
String(StringArray),
|
||||||
|
LargeString(LargeStringArray),
|
||||||
|
}
|
||||||
|
|
||||||
/// Vector of strings.
|
/// Vector of strings.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub struct StringVector {
|
pub struct StringVector {
|
||||||
array: StringArray,
|
array: StringArrayData,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StringVector {
|
impl StringVector {
|
||||||
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
pub(crate) fn as_arrow(&self) -> &dyn Array {
|
||||||
&self.array
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => array,
|
||||||
|
StringArrayData::LargeString(array) => array,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a StringVector from a regular StringArray
|
||||||
|
pub fn from_string_array(array: StringArray) -> Self {
|
||||||
|
Self {
|
||||||
|
array: StringArrayData::String(array),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a StringVector from a LargeStringArray
|
||||||
|
pub fn from_large_string_array(array: LargeStringArray) -> Self {
|
||||||
|
Self {
|
||||||
|
array: StringArrayData::LargeString(array),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_slice<T: AsRef<str>>(slice: &[T]) -> Self {
|
||||||
|
Self::from_string_array(StringArray::from_iter(
|
||||||
|
slice.iter().map(|s| Some(s.as_ref())),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<StringArray> for StringVector {
|
impl From<StringArray> for StringVector {
|
||||||
fn from(array: StringArray) -> Self {
|
fn from(array: StringArray) -> Self {
|
||||||
Self { array }
|
Self::from_string_array(array)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<LargeStringArray> for StringVector {
|
||||||
|
fn from(array: LargeStringArray) -> Self {
|
||||||
|
Self::from_large_string_array(array)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Vec<Option<String>>> for StringVector {
|
impl From<Vec<Option<String>>> for StringVector {
|
||||||
fn from(data: Vec<Option<String>>) -> Self {
|
fn from(data: Vec<Option<String>>) -> Self {
|
||||||
Self {
|
Self::from_string_array(StringArray::from_iter(data))
|
||||||
array: StringArray::from_iter(data),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Vec<Option<&str>>> for StringVector {
|
impl From<Vec<Option<&str>>> for StringVector {
|
||||||
fn from(data: Vec<Option<&str>>) -> Self {
|
fn from(data: Vec<Option<&str>>) -> Self {
|
||||||
Self {
|
Self::from_string_array(StringArray::from_iter(data))
|
||||||
array: StringArray::from_iter(data),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&[Option<String>]> for StringVector {
|
impl From<&[Option<String>]> for StringVector {
|
||||||
fn from(data: &[Option<String>]) -> Self {
|
fn from(data: &[Option<String>]) -> Self {
|
||||||
Self {
|
Self::from_string_array(StringArray::from_iter(data))
|
||||||
array: StringArray::from_iter(data),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&[Option<&str>]> for StringVector {
|
impl From<&[Option<&str>]> for StringVector {
|
||||||
fn from(data: &[Option<&str>]) -> Self {
|
fn from(data: &[Option<&str>]) -> Self {
|
||||||
Self {
|
Self::from_string_array(StringArray::from_iter(data))
|
||||||
array: StringArray::from_iter(data),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Vec<String>> for StringVector {
|
impl From<Vec<String>> for StringVector {
|
||||||
fn from(data: Vec<String>) -> Self {
|
fn from(data: Vec<String>) -> Self {
|
||||||
Self {
|
Self::from_string_array(StringArray::from_iter(data.into_iter().map(Some)))
|
||||||
array: StringArray::from_iter(data.into_iter().map(Some)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Vec<&str>> for StringVector {
|
impl From<Vec<&str>> for StringVector {
|
||||||
fn from(data: Vec<&str>) -> Self {
|
fn from(data: Vec<&str>) -> Self {
|
||||||
Self {
|
Self::from_string_array(StringArray::from_iter(data.into_iter().map(Some)))
|
||||||
array: StringArray::from_iter(data.into_iter().map(Some)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -106,67 +132,177 @@ impl Vector for StringVector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
self.array.len()
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => array.len(),
|
||||||
|
StringArrayData::LargeString(array) => array.len(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_arrow_array(&self) -> ArrayRef {
|
fn to_arrow_array(&self) -> ArrayRef {
|
||||||
Arc::new(self.array.clone())
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => Arc::new(array.clone()),
|
||||||
|
StringArrayData::LargeString(array) => Arc::new(array.clone()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
|
fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
|
||||||
Box::new(self.array.clone())
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => Box::new(array.clone()),
|
||||||
|
StringArrayData::LargeString(array) => Box::new(array.clone()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn validity(&self) -> Validity {
|
fn validity(&self) -> Validity {
|
||||||
vectors::impl_validity_for_vector!(self.array)
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => vectors::impl_validity_for_vector!(array),
|
||||||
|
StringArrayData::LargeString(array) => vectors::impl_validity_for_vector!(array),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn memory_size(&self) -> usize {
|
fn memory_size(&self) -> usize {
|
||||||
self.array.get_buffer_memory_size()
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => array.get_buffer_memory_size(),
|
||||||
|
StringArrayData::LargeString(array) => array.get_buffer_memory_size(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn null_count(&self) -> usize {
|
fn null_count(&self) -> usize {
|
||||||
self.array.null_count()
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => array.null_count(),
|
||||||
|
StringArrayData::LargeString(array) => array.null_count(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_null(&self, row: usize) -> bool {
|
fn is_null(&self, row: usize) -> bool {
|
||||||
self.array.is_null(row)
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => array.is_null(row),
|
||||||
|
StringArrayData::LargeString(array) => array.is_null(row),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn slice(&self, offset: usize, length: usize) -> VectorRef {
|
fn slice(&self, offset: usize, length: usize) -> VectorRef {
|
||||||
Arc::new(Self::from(self.array.slice(offset, length)))
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => {
|
||||||
|
Arc::new(Self::from_string_array(array.slice(offset, length)))
|
||||||
|
}
|
||||||
|
StringArrayData::LargeString(array) => {
|
||||||
|
Arc::new(Self::from_large_string_array(array.slice(offset, length)))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get(&self, index: usize) -> Value {
|
fn get(&self, index: usize) -> Value {
|
||||||
vectors::impl_get_for_vector!(self.array, index)
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => vectors::impl_get_for_vector!(array, index),
|
||||||
|
StringArrayData::LargeString(array) => vectors::impl_get_for_vector!(array, index),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => vectors::impl_get_ref_for_vector!(array, index),
|
||||||
|
StringArrayData::LargeString(array) => vectors::impl_get_ref_for_vector!(array, index),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum StringIter<'a> {
|
||||||
|
String(ArrayIter<&'a StringArray>),
|
||||||
|
LargeString(ArrayIter<&'a LargeStringArray>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Iterator for StringIter<'a> {
|
||||||
|
type Item = Option<&'a str>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
match self {
|
||||||
|
StringIter::String(iter) => iter.next(),
|
||||||
|
StringIter::LargeString(iter) => iter.next(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ScalarVector for StringVector {
|
impl ScalarVector for StringVector {
|
||||||
type OwnedItem = String;
|
type OwnedItem = String;
|
||||||
type RefItem<'a> = &'a str;
|
type RefItem<'a> = &'a str;
|
||||||
type Iter<'a> = ArrayIter<&'a StringArray>;
|
type Iter<'a> = StringIter<'a>;
|
||||||
type Builder = StringVectorBuilder;
|
type Builder = StringVectorBuilder;
|
||||||
|
|
||||||
fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
|
fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
|
||||||
if self.array.is_valid(idx) {
|
match &self.array {
|
||||||
Some(self.array.value(idx))
|
StringArrayData::String(array) => {
|
||||||
} else {
|
if array.is_valid(idx) {
|
||||||
None
|
Some(array.value(idx))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StringArrayData::LargeString(array) => {
|
||||||
|
if array.is_valid(idx) {
|
||||||
|
Some(array.value(idx))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iter_data(&self) -> Self::Iter<'_> {
|
fn iter_data(&self) -> Self::Iter<'_> {
|
||||||
self.array.iter()
|
match &self.array {
|
||||||
|
StringArrayData::String(array) => StringIter::String(array.iter()),
|
||||||
|
StringArrayData::LargeString(array) => StringIter::LargeString(array.iter()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Internal representation for mutable string arrays
|
||||||
|
enum MutableStringArrayData {
|
||||||
|
String(MutableStringArray),
|
||||||
|
LargeString(MutableLargeStringArray),
|
||||||
|
}
|
||||||
|
|
||||||
pub struct StringVectorBuilder {
|
pub struct StringVectorBuilder {
|
||||||
pub mutable_array: MutableStringArray,
|
mutable_array: MutableStringArrayData,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for StringVectorBuilder {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StringVectorBuilder {
|
||||||
|
/// Create a builder for regular strings
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
mutable_array: MutableStringArrayData::String(MutableStringArray::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a builder for large strings
|
||||||
|
pub fn new_large() -> Self {
|
||||||
|
Self {
|
||||||
|
mutable_array: MutableStringArrayData::LargeString(MutableLargeStringArray::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a builder for regular strings with capacity
|
||||||
|
pub fn with_string_capacity(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
mutable_array: MutableStringArrayData::String(MutableStringArray::with_capacity(
|
||||||
|
capacity, 0,
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a builder for large strings with capacity
|
||||||
|
pub fn with_large_capacity(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
mutable_array: MutableStringArrayData::LargeString(
|
||||||
|
MutableLargeStringArray::with_capacity(capacity, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MutableVector for StringVectorBuilder {
|
impl MutableVector for StringVectorBuilder {
|
||||||
@@ -175,7 +311,10 @@ impl MutableVector for StringVectorBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
self.mutable_array.len()
|
match &self.mutable_array {
|
||||||
|
MutableStringArrayData::String(array) => array.len(),
|
||||||
|
MutableStringArrayData::LargeString(array) => array.len(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn as_any(&self) -> &dyn Any {
|
fn as_any(&self) -> &dyn Any {
|
||||||
@@ -195,8 +334,14 @@ impl MutableVector for StringVectorBuilder {
|
|||||||
}
|
}
|
||||||
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
|
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
|
||||||
match value.try_into_string()? {
|
match value.try_into_string()? {
|
||||||
Some(v) => self.mutable_array.append_value(v),
|
Some(v) => match &mut self.mutable_array {
|
||||||
None => self.mutable_array.append_null(),
|
MutableStringArrayData::String(array) => array.append_value(v),
|
||||||
|
MutableStringArrayData::LargeString(array) => array.append_value(v),
|
||||||
|
},
|
||||||
|
None => match &mut self.mutable_array {
|
||||||
|
MutableStringArrayData::String(array) => array.append_null(),
|
||||||
|
MutableStringArrayData::LargeString(array) => array.append_null(),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -206,7 +351,10 @@ impl MutableVector for StringVectorBuilder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn push_null(&mut self) {
|
fn push_null(&mut self) {
|
||||||
self.mutable_array.append_null()
|
match &mut self.mutable_array {
|
||||||
|
MutableStringArrayData::String(array) => array.append_null(),
|
||||||
|
MutableStringArrayData::LargeString(array) => array.append_null(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,26 +363,44 @@ impl ScalarVectorBuilder for StringVectorBuilder {
|
|||||||
|
|
||||||
fn with_capacity(capacity: usize) -> Self {
|
fn with_capacity(capacity: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
mutable_array: MutableStringArray::with_capacity(capacity, 0),
|
mutable_array: MutableStringArrayData::String(MutableStringArray::with_capacity(
|
||||||
|
capacity, 0,
|
||||||
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
|
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
|
||||||
match value {
|
match value {
|
||||||
Some(v) => self.mutable_array.append_value(v),
|
Some(v) => match &mut self.mutable_array {
|
||||||
None => self.mutable_array.append_null(),
|
MutableStringArrayData::String(array) => array.append_value(v),
|
||||||
|
MutableStringArrayData::LargeString(array) => array.append_value(v),
|
||||||
|
},
|
||||||
|
None => match &mut self.mutable_array {
|
||||||
|
MutableStringArrayData::String(array) => array.append_null(),
|
||||||
|
MutableStringArrayData::LargeString(array) => array.append_null(),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn finish(&mut self) -> Self::VectorType {
|
fn finish(&mut self) -> Self::VectorType {
|
||||||
StringVector {
|
match &mut self.mutable_array {
|
||||||
array: self.mutable_array.finish(),
|
MutableStringArrayData::String(array) => {
|
||||||
|
StringVector::from_string_array(array.finish())
|
||||||
|
}
|
||||||
|
MutableStringArrayData::LargeString(array) => {
|
||||||
|
StringVector::from_large_string_array(array.finish())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn finish_cloned(&self) -> Self::VectorType {
|
fn finish_cloned(&self) -> Self::VectorType {
|
||||||
StringVector {
|
match &self.mutable_array {
|
||||||
array: self.mutable_array.finish_cloned(),
|
MutableStringArrayData::String(array) => {
|
||||||
|
StringVector::from_string_array(array.finish_cloned())
|
||||||
|
}
|
||||||
|
MutableStringArrayData::LargeString(array) => {
|
||||||
|
StringVector::from_large_string_array(array.finish_cloned())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -248,7 +414,26 @@ impl Serializable for StringVector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vectors::impl_try_from_arrow_array_for_vector!(StringArray, StringVector);
|
impl StringVector {
|
||||||
|
pub fn try_from_arrow_array(
|
||||||
|
array: impl AsRef<dyn Array>,
|
||||||
|
) -> crate::error::Result<StringVector> {
|
||||||
|
let array = array.as_ref();
|
||||||
|
|
||||||
|
if let Some(string_array) = array.as_any().downcast_ref::<StringArray>() {
|
||||||
|
Ok(StringVector::from_string_array(string_array.clone()))
|
||||||
|
} else if let Some(large_string_array) = array.as_any().downcast_ref::<LargeStringArray>() {
|
||||||
|
Ok(StringVector::from_large_string_array(
|
||||||
|
large_string_array.clone(),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Err(crate::error::UnsupportedArrowTypeSnafu {
|
||||||
|
arrow_type: array.data_type().clone(),
|
||||||
|
}
|
||||||
|
.build())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|||||||
@@ -127,14 +127,13 @@ mod tests {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
debug_format,
|
debug_format,
|
||||||
r#"
|
r#"
|
||||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3429, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3505, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3173, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3413, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
|
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3489, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#
|
||||||
);
|
);
|
||||||
|
|
||||||
// list from storage
|
// list from storage
|
||||||
let storage_entries = mito
|
let storage_entries = mito
|
||||||
.all_ssts_from_storage()
|
.all_ssts_from_storage()
|
||||||
|
|||||||
@@ -699,10 +699,20 @@ mod test {
|
|||||||
semantic_type,
|
semantic_type,
|
||||||
column_id: 5,
|
column_id: 5,
|
||||||
};
|
};
|
||||||
let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
|
let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
|
||||||
|
let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
MetadataRegion::serialize_column_metadata(&column_metadata),
|
MetadataRegion::serialize_column_metadata(&column_metadata),
|
||||||
expected
|
new_fmt
|
||||||
|
);
|
||||||
|
// Ensure both old and new formats can be deserialized.
|
||||||
|
assert_eq!(
|
||||||
|
MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
|
||||||
|
column_metadata
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
|
||||||
|
column_metadata
|
||||||
);
|
);
|
||||||
|
|
||||||
let semantic_type = "\"Invalid Column Metadata\"";
|
let semantic_type = "\"Invalid Column Metadata\"";
|
||||||
|
|||||||
@@ -17,7 +17,6 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use datatypes::data_type::ConcreteDataType;
|
|
||||||
use datatypes::value::ValueRef;
|
use datatypes::value::ValueRef;
|
||||||
use memcomparable::Serializer;
|
use memcomparable::Serializer;
|
||||||
use snafu::{OptionExt, ResultExt, ensure};
|
use snafu::{OptionExt, ResultExt, ensure};
|
||||||
@@ -49,7 +48,7 @@ impl IndexValueCodec {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
ensure!(!value.is_null(), IndexEncodeNullSnafu);
|
ensure!(!value.is_null(), IndexEncodeNullSnafu);
|
||||||
|
|
||||||
if matches!(field.data_type(), ConcreteDataType::String(_)) {
|
if field.data_type().is_string() {
|
||||||
let value = value
|
let value = value
|
||||||
.try_into_string()
|
.try_into_string()
|
||||||
.context(FieldTypeMismatchSnafu)?
|
.context(FieldTypeMismatchSnafu)?
|
||||||
|
|||||||
@@ -787,9 +787,9 @@ async fn test_list_ssts() {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
debug_format,
|
debug_format,
|
||||||
r#"
|
r#"
|
||||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2515, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#
|
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#
|
||||||
);
|
);
|
||||||
|
|
||||||
// list from storage
|
// list from storage
|
||||||
|
|||||||
@@ -923,6 +923,6 @@ mod test {
|
|||||||
|
|
||||||
// get manifest size again
|
// get manifest size again
|
||||||
let manifest_size = manager.manifest_usage();
|
let manifest_size = manager.manifest_usage();
|
||||||
assert_eq!(manifest_size, 1748);
|
assert_eq!(manifest_size, 1764);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::AtomicUsize;
|
use std::sync::atomic::AtomicUsize;
|
||||||
|
|
||||||
use common_telemetry::warn;
|
use common_telemetry::warn;
|
||||||
use datatypes::arrow::array::{Array, StringArray};
|
use datatypes::arrow::array::{Array, LargeStringArray, StringArray};
|
||||||
use datatypes::arrow::datatypes::DataType;
|
use datatypes::arrow::datatypes::DataType;
|
||||||
use datatypes::arrow::record_batch::RecordBatch;
|
use datatypes::arrow::record_batch::RecordBatch;
|
||||||
use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
|
use datatypes::schema::{FulltextAnalyzer, FulltextBackend};
|
||||||
@@ -321,12 +321,34 @@ impl SingleCreator {
|
|||||||
if let Some(column_array) = batch.column_by_name(&self.column_name) {
|
if let Some(column_array) = batch.column_by_name(&self.column_name) {
|
||||||
// Convert Arrow array to string array.
|
// Convert Arrow array to string array.
|
||||||
// TODO(yingwen): Use Utf8View later if possible.
|
// TODO(yingwen): Use Utf8View later if possible.
|
||||||
let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8)
|
match column_array.data_type() {
|
||||||
.context(ComputeArrowSnafu)?;
|
DataType::Utf8 => {
|
||||||
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
|
let string_array = column_array.as_any().downcast_ref::<StringArray>().unwrap();
|
||||||
for text_opt in string_array.iter() {
|
for text_opt in string_array.iter() {
|
||||||
let text = text_opt.unwrap_or_default();
|
let text = text_opt.unwrap_or_default();
|
||||||
self.inner.push_text(text).await?;
|
self.inner.push_text(text).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DataType::LargeUtf8 => {
|
||||||
|
let large_string_array = column_array
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<LargeStringArray>()
|
||||||
|
.unwrap();
|
||||||
|
for text_opt in large_string_array.iter() {
|
||||||
|
let text = text_opt.unwrap_or_default();
|
||||||
|
self.inner.push_text(text).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// For other types, cast to Utf8 as before
|
||||||
|
let array = datatypes::arrow::compute::cast(column_array, &DataType::Utf8)
|
||||||
|
.context(ComputeArrowSnafu)?;
|
||||||
|
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
|
||||||
|
for text_opt in string_array.iter() {
|
||||||
|
let text = text_opt.unwrap_or_default();
|
||||||
|
self.inner.push_text(text).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If the column is not found in the batch, push empty text.
|
// If the column is not found in the batch, push empty text.
|
||||||
|
|||||||
@@ -688,7 +688,7 @@ impl FlatConvertFormat {
|
|||||||
let values_array = values_vector.to_arrow_array();
|
let values_array = values_vector.to_arrow_array();
|
||||||
|
|
||||||
// Only creates dictionary array for string types, otherwise take values by keys
|
// Only creates dictionary array for string types, otherwise take values by keys
|
||||||
if matches!(column_type, ConcreteDataType::String(_)) {
|
if column_type.is_string() {
|
||||||
// Creates dictionary array using the same keys for string types
|
// Creates dictionary array using the same keys for string types
|
||||||
// Note that the dictionary values may have nulls.
|
// Note that the dictionary values may have nulls.
|
||||||
let dict_array = DictionaryArray::new(keys.clone(), values_array);
|
let dict_array = DictionaryArray::new(keys.clone(), values_array);
|
||||||
|
|||||||
@@ -201,9 +201,14 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
|
|||||||
},
|
},
|
||||||
ValueInner::NULL => value::to_null_scalar_value(t).context(error::ConvertScalarValueSnafu),
|
ValueInner::NULL => value::to_null_scalar_value(t).context(error::ConvertScalarValueSnafu),
|
||||||
ValueInner::Bytes(b) => match t {
|
ValueInner::Bytes(b) => match t {
|
||||||
ConcreteDataType::String(_) => Ok(ScalarValue::Utf8(Some(
|
ConcreteDataType::String(t) => {
|
||||||
String::from_utf8_lossy(b).to_string(),
|
let s = String::from_utf8_lossy(b).to_string();
|
||||||
))),
|
if t.is_large() {
|
||||||
|
Ok(ScalarValue::LargeUtf8(Some(s)))
|
||||||
|
} else {
|
||||||
|
Ok(ScalarValue::Utf8(Some(s)))
|
||||||
|
}
|
||||||
|
}
|
||||||
ConcreteDataType::Binary(_) => Ok(ScalarValue::Binary(Some(b.to_vec()))),
|
ConcreteDataType::Binary(_) => Ok(ScalarValue::Binary(Some(b.to_vec()))),
|
||||||
ConcreteDataType::Timestamp(ts_type) => covert_bytes_to_timestamp(b, ts_type),
|
ConcreteDataType::Timestamp(ts_type) => covert_bytes_to_timestamp(b, ts_type),
|
||||||
_ => error::PreparedStmtTypeMismatchSnafu {
|
_ => error::PreparedStmtTypeMismatchSnafu {
|
||||||
|
|||||||
@@ -241,7 +241,7 @@ fn encode_array(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ConcreteDataType::String(_) => {
|
&ConcreteDataType::String(_) => {
|
||||||
let array = value_list
|
let array = value_list
|
||||||
.items()
|
.items()
|
||||||
.iter()
|
.iter()
|
||||||
@@ -687,7 +687,13 @@ pub(super) fn parameters_to_scalar_values(
|
|||||||
let data = portal.parameter::<String>(idx, &client_type)?;
|
let data = portal.parameter::<String>(idx, &client_type)?;
|
||||||
if let Some(server_type) = &server_type {
|
if let Some(server_type) = &server_type {
|
||||||
match server_type {
|
match server_type {
|
||||||
ConcreteDataType::String(_) => ScalarValue::Utf8(data),
|
ConcreteDataType::String(t) => {
|
||||||
|
if t.is_large() {
|
||||||
|
ScalarValue::LargeUtf8(data)
|
||||||
|
} else {
|
||||||
|
ScalarValue::Utf8(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => {
|
_ => {
|
||||||
return Err(invalid_parameter_error(
|
return Err(invalid_parameter_error(
|
||||||
"invalid_parameter_type",
|
"invalid_parameter_type",
|
||||||
@@ -969,8 +975,13 @@ pub(super) fn parameters_to_scalar_values(
|
|||||||
let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
|
let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
|
||||||
if let Some(server_type) = &server_type {
|
if let Some(server_type) = &server_type {
|
||||||
match server_type {
|
match server_type {
|
||||||
ConcreteDataType::String(_) => {
|
ConcreteDataType::String(t) => {
|
||||||
ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
|
let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
|
||||||
|
if t.is_large() {
|
||||||
|
ScalarValue::LargeUtf8(s)
|
||||||
|
} else {
|
||||||
|
ScalarValue::Utf8(s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
|
ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
|
||||||
_ => {
|
_ => {
|
||||||
|
|||||||
@@ -511,7 +511,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let logical_table_serialized = serde_json::to_string(&logical_table_expr).unwrap();
|
let logical_table_serialized = serde_json::to_string(&logical_table_expr).unwrap();
|
||||||
let logical_table_expected = r#"{"table_name":{"value":"impedit","quote_style":null},"columns":[{"name":{"value":"ts","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"val","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"totam","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"cumque","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"natus","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"molestias","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]},{"name":{"value":"qui","quote_style":null},"column_type":{"String":null},"options":["PrimaryKey"]}],"if_not_exists":false,"partition":null,"engine":"metric","options":{"on_physical_table":{"String":"expedita"}},"primary_keys":[4,2,3,6,5]}"#;
|
let logical_table_expected = r#"{"table_name":{"value":"impedit","quote_style":null},"columns":[{"name":{"value":"ts","quote_style":null},"column_type":{"Timestamp":{"Millisecond":null}},"options":["TimeIndex"]},{"name":{"value":"val","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"totam","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"cumque","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"natus","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"molestias","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]},{"name":{"value":"qui","quote_style":null},"column_type":{"String":{"size_type":"Utf8"}},"options":["PrimaryKey"]}],"if_not_exists":false,"partition":null,"engine":"metric","options":{"on_physical_table":{"String":"expedita"}},"primary_keys":[4,2,3,6,5]}"#;
|
||||||
assert_eq!(logical_table_expected, logical_table_serialized);
|
assert_eq!(logical_table_expected, logical_table_serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,12 +33,23 @@ Affected Rows: 9
|
|||||||
|
|
||||||
SELECT g, STRING_AGG(x,'|') FROM strings GROUP BY g ORDER BY g;
|
SELECT g, STRING_AGG(x,'|') FROM strings GROUP BY g ORDER BY g;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1
|
+---+---------------------------------+
|
||||||
|
| g | string_agg(strings.x,Utf8("|")) |
|
||||||
|
+---+---------------------------------+
|
||||||
|
| 1 | a|b |
|
||||||
|
| 2 | i|j |
|
||||||
|
| 3 | p |
|
||||||
|
| 4 | x|y|z |
|
||||||
|
+---+---------------------------------+
|
||||||
|
|
||||||
-- test agg on empty set
|
-- test agg on empty set
|
||||||
SELECT STRING_AGG(x,',') FROM strings WHERE g > 100;
|
SELECT STRING_AGG(x,',') FROM strings WHERE g > 100;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0
|
+---------------------------------+
|
||||||
|
| string_agg(strings.x,Utf8(",")) |
|
||||||
|
+---------------------------------+
|
||||||
|
| |
|
||||||
|
+---------------------------------+
|
||||||
|
|
||||||
-- string_agg can be used instead of group_concat
|
-- string_agg can be used instead of group_concat
|
||||||
SELECT string_agg('a', ',');
|
SELECT string_agg('a', ',');
|
||||||
@@ -59,35 +70,75 @@ SELECT string_agg('a', ',');
|
|||||||
|
|
||||||
SELECT g, string_agg(x, ',') FROM strings GROUP BY g ORDER BY g;
|
SELECT g, string_agg(x, ',') FROM strings GROUP BY g ORDER BY g;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1
|
+---+---------------------------------+
|
||||||
|
| g | string_agg(strings.x,Utf8(",")) |
|
||||||
|
+---+---------------------------------+
|
||||||
|
| 1 | a,b |
|
||||||
|
| 2 | i,j |
|
||||||
|
| 3 | p |
|
||||||
|
| 4 | x,y,z |
|
||||||
|
+---+---------------------------------+
|
||||||
|
|
||||||
-- Test ORDER BY
|
-- Test ORDER BY
|
||||||
-- Single group
|
-- Single group
|
||||||
SELECT STRING_AGG(x, '' ORDER BY x ASC), STRING_AGG(x, '|' ORDER BY x ASC) FROM strings;
|
SELECT STRING_AGG(x, '' ORDER BY x ASC), STRING_AGG(x, '|' ORDER BY x ASC) FROM strings;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0
|
+--------------------------------------------------------------------+---------------------------------------------------------------------+
|
||||||
|
| string_agg(strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x ASC NULLS LAST] |
|
||||||
|
+--------------------------------------------------------------------+---------------------------------------------------------------------+
|
||||||
|
| abijpxyz | a|b|i|j|p|x|y|z |
|
||||||
|
+--------------------------------------------------------------------+---------------------------------------------------------------------+
|
||||||
|
|
||||||
SELECT STRING_AGG(x, '' ORDER BY x DESC), STRING_AGG(x,'|' ORDER BY x DESC) FROM strings;
|
SELECT STRING_AGG(x, '' ORDER BY x DESC), STRING_AGG(x,'|' ORDER BY x DESC) FROM strings;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0
|
+----------------------------------------------------------------------+-----------------------------------------------------------------------+
|
||||||
|
| string_agg(strings.x,Utf8("")) ORDER BY [strings.x DESC NULLS FIRST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x DESC NULLS FIRST] |
|
||||||
|
+----------------------------------------------------------------------+-----------------------------------------------------------------------+
|
||||||
|
| zyxpjiba | z|y|x|p|j|i|b|a |
|
||||||
|
+----------------------------------------------------------------------+-----------------------------------------------------------------------+
|
||||||
|
|
||||||
-- Grouped with ORDER BY
|
-- Grouped with ORDER BY
|
||||||
SELECT g, STRING_AGG(x, '' ORDER BY x ASC), STRING_AGG(x, '|' ORDER BY x ASC) FROM strings GROUP BY g ORDER BY g;
|
SELECT g, STRING_AGG(x, '' ORDER BY x ASC), STRING_AGG(x, '|' ORDER BY x ASC) FROM strings GROUP BY g ORDER BY g;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1
|
+---+--------------------------------------------------------------------+---------------------------------------------------------------------+
|
||||||
|
| g | string_agg(strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x ASC NULLS LAST] |
|
||||||
|
+---+--------------------------------------------------------------------+---------------------------------------------------------------------+
|
||||||
|
| 1 | ab | a|b |
|
||||||
|
| 2 | ij | i|j |
|
||||||
|
| 3 | p | p |
|
||||||
|
| 4 | xyz | x|y|z |
|
||||||
|
+---+--------------------------------------------------------------------+---------------------------------------------------------------------+
|
||||||
|
|
||||||
SELECT g, STRING_AGG(x, '' ORDER BY x DESC), STRING_AGG(x,'|' ORDER BY x DESC) FROM strings GROUP BY g ORDER BY g;
|
SELECT g, STRING_AGG(x, '' ORDER BY x DESC), STRING_AGG(x,'|' ORDER BY x DESC) FROM strings GROUP BY g ORDER BY g;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1
|
+---+----------------------------------------------------------------------+-----------------------------------------------------------------------+
|
||||||
|
| g | string_agg(strings.x,Utf8("")) ORDER BY [strings.x DESC NULLS FIRST] | string_agg(strings.x,Utf8("|")) ORDER BY [strings.x DESC NULLS FIRST] |
|
||||||
|
+---+----------------------------------------------------------------------+-----------------------------------------------------------------------+
|
||||||
|
| 1 | ba | b|a |
|
||||||
|
| 2 | ji | j|i |
|
||||||
|
| 3 | p | p |
|
||||||
|
| 4 | zyx | z|y|x |
|
||||||
|
+---+----------------------------------------------------------------------+-----------------------------------------------------------------------+
|
||||||
|
|
||||||
-- Test with DISTINCT
|
-- Test with DISTINCT
|
||||||
SELECT STRING_AGG(DISTINCT x, '' ORDER BY x), STRING_AGG(DISTINCT x, '|' ORDER BY x) FROM strings;
|
SELECT STRING_AGG(DISTINCT x, '' ORDER BY x), STRING_AGG(DISTINCT x, '|' ORDER BY x) FROM strings;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 0
|
+-----------------------------------------------------------------------------+------------------------------------------------------------------------------+
|
||||||
|
| string_agg(DISTINCT strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] | string_agg(DISTINCT strings.x,Utf8("|")) ORDER BY [strings.x ASC NULLS LAST] |
|
||||||
|
+-----------------------------------------------------------------------------+------------------------------------------------------------------------------+
|
||||||
|
| abijpxyz | a|b|i|j|p|x|y|z |
|
||||||
|
+-----------------------------------------------------------------------------+------------------------------------------------------------------------------+
|
||||||
|
|
||||||
SELECT g, STRING_AGG(DISTINCT x, '' ORDER BY x) FROM strings GROUP BY g ORDER BY g;
|
SELECT g, STRING_AGG(DISTINCT x, '' ORDER BY x) FROM strings GROUP BY g ORDER BY g;
|
||||||
|
|
||||||
Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected LargeUtf8 but found Utf8 at column index 1
|
+---+-----------------------------------------------------------------------------+
|
||||||
|
| g | string_agg(DISTINCT strings.x,Utf8("")) ORDER BY [strings.x ASC NULLS LAST] |
|
||||||
|
+---+-----------------------------------------------------------------------------+
|
||||||
|
| 1 | ab |
|
||||||
|
| 2 | ij |
|
||||||
|
| 3 | p |
|
||||||
|
| 4 | xyz |
|
||||||
|
+---+-----------------------------------------------------------------------------+
|
||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
DROP TABLE strings;
|
DROP TABLE strings;
|
||||||
|
|||||||
@@ -36,12 +36,12 @@ Affected Rows: 0
|
|||||||
-- create logical table with different data type on field column
|
-- create logical table with different data type on field column
|
||||||
CREATE TABLE t3 (ts timestamp time index, val string, host string, primary key (host)) engine=metric with ("on_physical_table" = "phy");
|
CREATE TABLE t3 (ts timestamp time index, val string, host string, primary key (host)) engine=metric with ("on_physical_table" = "phy");
|
||||||
|
|
||||||
Error: 1004(InvalidArguments), Column type mismatch. Expect Float64(Float64Type), got String(StringType)
|
Error: 1004(InvalidArguments), Column type mismatch. Expect Float64(Float64Type), got String(StringType { size_type: Utf8 })
|
||||||
|
|
||||||
-- create logical table with different data type on tag column
|
-- create logical table with different data type on tag column
|
||||||
CREATE TABLE t4 (ts timestamp time index, val double, host double, primary key (host)) engine=metric with ("on_physical_table" = "phy");
|
CREATE TABLE t4 (ts timestamp time index, val double, host double, primary key (host)) engine=metric with ("on_physical_table" = "phy");
|
||||||
|
|
||||||
Error: 1004(InvalidArguments), Column type mismatch. Expect String(StringType), got Float64(Float64Type)
|
Error: 1004(InvalidArguments), Column type mismatch. Expect String(StringType { size_type: Utf8 }), got Float64(Float64Type)
|
||||||
|
|
||||||
-- create logical table with different column name on field column
|
-- create logical table with different column name on field column
|
||||||
CREATE TABLE t5 (ts timestamp time index, valval double, host string primary key) engine = metric with ("on_physical_table" = "phy");
|
CREATE TABLE t5 (ts timestamp time index, valval double, host string primary key) engine = metric with ("on_physical_table" = "phy");
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ Affected Rows: 1
|
|||||||
|
|
||||||
INSERT INTO strings VALUES (3, 4);
|
INSERT INTO strings VALUES (3, 4);
|
||||||
|
|
||||||
Error: 2000(InvalidSyntax), Failed to parse value: Fail to parse number 3, invalid column type: String(StringType)
|
Error: 2000(InvalidSyntax), Failed to parse value: Fail to parse number 3, invalid column type: String(StringType { size_type: Utf8 })
|
||||||
|
|
||||||
SELECT * FROM strings WHERE i = 'â‚(';
|
SELECT * FROM strings WHERE i = 'â‚(';
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user