feat: implement view-based vector types (#7600)

* feat: implement view-based vector types

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* support large binary array, simplify match

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-21 20:44:06 +08:00
committed by GitHub
parent 966ade7565
commit c34e9970e7
11 changed files with 559 additions and 205 deletions

View File

@@ -25,11 +25,16 @@ use common_time::time::Time;
use common_time::{Duration, Timestamp};
pub type BinaryArray = arrow::array::BinaryArray;
pub type LargeBinaryArray = arrow::array::LargeBinaryArray;
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
pub type BinaryViewArray = arrow::array::BinaryViewArray;
pub type MutableBinaryViewArray = arrow::array::BinaryViewBuilder;
pub type StringArray = arrow::array::StringArray;
pub type MutableStringArray = arrow::array::StringBuilder;
pub type LargeStringArray = arrow::array::LargeStringArray;
pub type MutableLargeStringArray = arrow::array::LargeStringBuilder;
pub type StringViewArray = arrow::array::StringViewArray;
pub type MutableStringViewArray = arrow::array::StringViewBuilder;
/// Get the [Timestamp] value at index `i` of the timestamp array.
///
@@ -154,6 +159,40 @@ pub fn string_array_value_at_index(array: &ArrayRef, i: usize) -> Option<&str> {
}
}
/// Get the string value at index `i` for `Utf8`, `LargeUtf8`, or `Utf8View` arrays.
///
/// Note: This method does not check for nulls and the value is arbitrary
/// if [`is_null`](arrow::array::Array::is_null) returns true for the index.
///
/// # Panics
/// 1. if index `i` is out of bounds;
/// 2. or the array is not a string type.
pub fn string_array_value(array: &ArrayRef, i: usize) -> &str {
match array.data_type() {
DataType::Utf8 => array.as_string::<i32>().value(i),
DataType::LargeUtf8 => array.as_string::<i64>().value(i),
DataType::Utf8View => array.as_string_view().value(i),
_ => unreachable!(),
}
}
/// Get the binary value at index `i` for `Binary`, `LargeBinary`, or `BinaryView` arrays.
///
/// Note: This method does not check for nulls and the value is arbitrary
/// if [`is_null`](arrow::array::Array::is_null) returns true for the index.
///
/// # Panics
/// 1. if index `i` is out of bounds;
/// 2. or the array is not a binary type.
pub fn binary_array_value(array: &ArrayRef, i: usize) -> &[u8] {
match array.data_type() {
DataType::Binary => array.as_binary::<i32>().value(i),
DataType::LargeBinary => array.as_binary::<i64>().value(i),
DataType::BinaryView => array.as_binary_view().value(i),
_ => unreachable!(),
}
}
/// Get the integer value (`i64`) at index `i` for any integer array.
///
/// Returns `None` when:

View File

@@ -456,11 +456,13 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
ArrowDataType::Date32 => Self::date_datatype(),
ArrowDataType::Timestamp(u, _) => ConcreteDataType::from_arrow_time_unit(u),
ArrowDataType::Interval(u) => ConcreteDataType::from_arrow_interval_unit(u),
ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => {
Self::binary_datatype()
ArrowDataType::Binary | ArrowDataType::LargeBinary => {
Self::Binary(BinaryType::binary())
}
ArrowDataType::Utf8 | ArrowDataType::Utf8View => Self::string_datatype(),
ArrowDataType::LargeUtf8 => Self::large_string_datatype(),
ArrowDataType::BinaryView => Self::Binary(BinaryType::binary_view()),
ArrowDataType::Utf8 => Self::String(StringType::utf8()),
ArrowDataType::Utf8View => Self::String(StringType::utf8_view()),
ArrowDataType::LargeUtf8 => Self::String(StringType::large_utf8()),
ArrowDataType::List(field) => Self::List(ListType::new(Arc::new(
ConcreteDataType::from_arrow_type(field.data_type()),
))),
@@ -526,6 +528,14 @@ impl ConcreteDataType {
ConcreteDataType::String(StringType::large_utf8())
}
pub fn utf8_view_datatype() -> Self {
ConcreteDataType::String(StringType::utf8_view())
}
pub fn binary_view_datatype() -> Self {
ConcreteDataType::Binary(BinaryType::binary_view())
}
pub fn timestamp_second_datatype() -> Self {
ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType))
}
@@ -785,6 +795,12 @@ mod tests {
ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8),
ConcreteDataType::String(_)
));
let utf8_view_string_type = ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8View);
assert!(matches!(utf8_view_string_type, ConcreteDataType::String(_)));
assert_eq!(
ArrowDataType::Utf8View,
utf8_view_string_type.as_arrow_type()
);
// Test LargeUtf8 mapping to large String type
let large_string_type = ConcreteDataType::from_arrow_type(&ArrowDataType::LargeUtf8);
assert!(matches!(large_string_type, ConcreteDataType::String(_)));
@@ -807,6 +823,19 @@ mod tests {
));
}
#[test]
fn test_view_round_trip() {
let utf8_view_arrow = ArrowDataType::Utf8View;
let concrete_type = ConcreteDataType::from_arrow_type(&utf8_view_arrow);
let back_to_arrow = concrete_type.as_arrow_type();
assert_eq!(utf8_view_arrow, back_to_arrow);
let binary_view_arrow = ArrowDataType::BinaryView;
let concrete_type = ConcreteDataType::from_arrow_type(&binary_view_arrow);
let back_to_arrow = concrete_type.as_arrow_type();
assert_eq!(binary_view_arrow, back_to_arrow);
}
#[test]
fn test_large_utf8_round_trip() {
// Test round-trip conversion for LargeUtf8
@@ -835,6 +864,11 @@ mod tests {
panic!("Expected both to be String types");
}
// View strings should be distinct from Utf8 and LargeUtf8.
let view_concrete = ConcreteDataType::from_arrow_type(&ArrowDataType::Utf8View);
assert_ne!(utf8_concrete, view_concrete);
assert_ne!(large_utf8_concrete, view_concrete);
// They should be different types
assert_ne!(utf8_concrete, large_utf8_concrete);
}

View File

@@ -24,12 +24,74 @@ use crate::type_id::LogicalTypeId;
use crate::value::Value;
use crate::vectors::{BinaryVectorBuilder, MutableVector};
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct BinaryType;
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default,
)]
pub enum BinaryReprType {
#[default]
Binary,
BinaryView,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
pub struct BinaryType {
#[serde(default)]
repr_type: BinaryReprType,
}
/// Custom deserialization to support both old and new formats.
impl<'de> serde::Deserialize<'de> for BinaryType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(serde::Deserialize)]
struct Helper {
#[serde(default)]
repr_type: BinaryReprType,
}
let opt = Option::<Helper>::deserialize(deserializer)?;
Ok(match opt {
Some(helper) => Self {
repr_type: helper.repr_type,
},
None => Self::default(),
})
}
}
impl Default for BinaryType {
fn default() -> Self {
Self {
repr_type: BinaryReprType::Binary,
}
}
}
impl BinaryType {
pub fn binary() -> Self {
Self {
repr_type: BinaryReprType::Binary,
}
}
pub fn binary_view() -> Self {
Self {
repr_type: BinaryReprType::BinaryView,
}
}
pub fn is_view(&self) -> bool {
matches!(self.repr_type, BinaryReprType::BinaryView)
}
pub fn arc() -> DataTypeRef {
Arc::new(Self)
Arc::new(Self::default())
}
pub fn view_arc() -> DataTypeRef {
Arc::new(Self::binary_view())
}
}
@@ -47,11 +109,19 @@ impl DataType for BinaryType {
}
fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::Binary
match self.repr_type {
BinaryReprType::Binary => ArrowDataType::Binary,
BinaryReprType::BinaryView => ArrowDataType::BinaryView,
}
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
Box::new(BinaryVectorBuilder::with_capacity(capacity))
match self.repr_type {
BinaryReprType::Binary => Box::new(BinaryVectorBuilder::with_capacity(capacity)),
BinaryReprType::BinaryView => {
Box::new(BinaryVectorBuilder::with_view_capacity(capacity))
}
}
}
fn try_cast(&self, from: Value) -> Option<Value> {

View File

@@ -33,6 +33,8 @@ pub enum StringSizeType {
Utf8,
/// Large UTF8 strings (up to 2^63 bytes)
LargeUtf8,
/// A view into string data (Arrow `Utf8View`)
Utf8View,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
@@ -94,6 +96,11 @@ impl StringType {
Self::with_size(StringSizeType::LargeUtf8)
}
/// Create a StringType for view strings
pub fn utf8_view() -> Self {
Self::with_size(StringSizeType::Utf8View)
}
/// Get the size type
pub fn size_type(&self) -> StringSizeType {
self.size_type
@@ -104,6 +111,10 @@ impl StringType {
matches!(self.size_type, StringSizeType::LargeUtf8)
}
pub fn is_view(&self) -> bool {
matches!(self.size_type, StringSizeType::Utf8View)
}
pub fn arc() -> DataTypeRef {
Arc::new(Self::new())
}
@@ -111,6 +122,10 @@ impl StringType {
pub fn large_arc() -> DataTypeRef {
Arc::new(Self::large_utf8())
}
pub fn view_arc() -> DataTypeRef {
Arc::new(Self::utf8_view())
}
}
impl DataType for StringType {
@@ -130,6 +145,7 @@ impl DataType for StringType {
match self.size_type {
StringSizeType::Utf8 => ArrowDataType::Utf8,
StringSizeType::LargeUtf8 => ArrowDataType::LargeUtf8,
StringSizeType::Utf8View => ArrowDataType::Utf8View,
}
}
@@ -139,6 +155,7 @@ impl DataType for StringType {
StringSizeType::LargeUtf8 => {
Box::new(StringVectorBuilder::with_large_capacity(capacity))
}
StringSizeType::Utf8View => Box::new(StringVectorBuilder::with_view_capacity(capacity)),
}
}

View File

@@ -18,7 +18,9 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef};
use snafu::ResultExt;
use crate::arrow_array::{BinaryArray, MutableBinaryArray};
use crate::arrow_array::{
BinaryArray, BinaryViewArray, LargeBinaryArray, MutableBinaryArray, MutableBinaryViewArray,
};
use crate::data_type::ConcreteDataType;
use crate::error::{self, InvalidVectorSnafu, Result};
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
@@ -27,28 +29,33 @@ use crate::types::parse_string_to_vector_type_value;
use crate::value::{Value, ValueRef};
use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
#[derive(Debug, PartialEq)]
enum BinaryArrayData {
Binary(BinaryArray),
LargeBinary(LargeBinaryArray),
BinaryView(BinaryViewArray),
}
/// Vector of binary strings.
#[derive(Debug, PartialEq)]
pub struct BinaryVector {
array: BinaryArray,
array: BinaryArrayData,
}
impl BinaryVector {
pub(crate) fn as_arrow(&self) -> &dyn Array {
&self.array
match &self.array {
BinaryArrayData::Binary(array) => array,
BinaryArrayData::LargeBinary(array) => array,
BinaryArrayData::BinaryView(array) => array,
}
}
/// Creates a new binary vector of JSONB from a binary vector.
/// The binary vector must contain valid JSON strings.
pub fn convert_binary_to_json(&self) -> Result<BinaryVector> {
let arrow_array = self.to_arrow_array();
let mut vector = vec![];
for binary in arrow_array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
{
for binary in self.iter_data() {
let jsonb = if let Some(binary) = binary {
match jsonb::from_slice(binary) {
Ok(jsonb) => Some(jsonb.to_vec()),
@@ -69,14 +76,8 @@ impl BinaryVector {
}
pub fn convert_binary_to_vector(&self, dim: u32) -> Result<BinaryVector> {
let arrow_array = self.to_arrow_array();
let mut vector = vec![];
for binary in arrow_array
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap()
.iter()
{
for binary in self.iter_data() {
let Some(binary) = binary else {
vector.push(None);
continue;
@@ -110,14 +111,32 @@ impl BinaryVector {
impl From<BinaryArray> for BinaryVector {
fn from(array: BinaryArray) -> Self {
Self { array }
Self {
array: BinaryArrayData::Binary(array),
}
}
}
impl From<BinaryViewArray> for BinaryVector {
fn from(array: BinaryViewArray) -> Self {
Self {
array: BinaryArrayData::BinaryView(array),
}
}
}
impl From<LargeBinaryArray> for BinaryVector {
fn from(array: LargeBinaryArray) -> Self {
Self {
array: BinaryArrayData::LargeBinary(array),
}
}
}
impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
fn from(data: Vec<Option<Vec<u8>>>) -> Self {
Self {
array: BinaryArray::from_iter(data),
array: BinaryArrayData::Binary(BinaryArray::from_iter(data)),
}
}
}
@@ -125,14 +144,18 @@ impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
impl From<Vec<&[u8]>> for BinaryVector {
fn from(data: Vec<&[u8]>) -> Self {
Self {
array: BinaryArray::from_iter_values(data),
array: BinaryArrayData::Binary(BinaryArray::from_iter_values(data)),
}
}
}
impl Vector for BinaryVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::binary_datatype()
match &self.array {
BinaryArrayData::Binary(_) => ConcreteDataType::binary_datatype(),
BinaryArrayData::LargeBinary(_) => ConcreteDataType::binary_datatype(),
BinaryArrayData::BinaryView(_) => ConcreteDataType::binary_view_datatype(),
}
}
fn vector_type_name(&self) -> String {
@@ -144,51 +167,105 @@ impl Vector for BinaryVector {
}
fn len(&self) -> usize {
self.array.len()
match &self.array {
BinaryArrayData::Binary(array) => array.len(),
BinaryArrayData::LargeBinary(array) => array.len(),
BinaryArrayData::BinaryView(array) => array.len(),
}
}
fn to_arrow_array(&self) -> ArrayRef {
Arc::new(self.array.clone())
match &self.array {
BinaryArrayData::Binary(array) => Arc::new(array.clone()),
BinaryArrayData::LargeBinary(array) => Arc::new(array.clone()),
BinaryArrayData::BinaryView(array) => Arc::new(array.clone()),
}
}
fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
Box::new(self.array.clone())
match &self.array {
BinaryArrayData::Binary(array) => Box::new(array.clone()),
BinaryArrayData::LargeBinary(array) => Box::new(array.clone()),
BinaryArrayData::BinaryView(array) => Box::new(array.clone()),
}
}
fn validity(&self) -> Validity {
vectors::impl_validity_for_vector!(self.array)
match &self.array {
BinaryArrayData::Binary(array) => vectors::impl_validity_for_vector!(array),
BinaryArrayData::LargeBinary(array) => vectors::impl_validity_for_vector!(array),
BinaryArrayData::BinaryView(array) => vectors::impl_validity_for_vector!(array),
}
}
fn memory_size(&self) -> usize {
self.array.get_buffer_memory_size()
match &self.array {
BinaryArrayData::Binary(array) => array.get_buffer_memory_size(),
BinaryArrayData::LargeBinary(array) => array.get_buffer_memory_size(),
BinaryArrayData::BinaryView(array) => array.get_buffer_memory_size(),
}
}
fn null_count(&self) -> usize {
self.array.null_count()
match &self.array {
BinaryArrayData::Binary(array) => array.null_count(),
BinaryArrayData::LargeBinary(array) => array.null_count(),
BinaryArrayData::BinaryView(array) => array.null_count(),
}
}
fn is_null(&self, row: usize) -> bool {
self.array.is_null(row)
match &self.array {
BinaryArrayData::Binary(array) => array.is_null(row),
BinaryArrayData::LargeBinary(array) => array.is_null(row),
BinaryArrayData::BinaryView(array) => array.is_null(row),
}
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
let array = self.array.slice(offset, length);
Arc::new(Self { array })
match &self.array {
BinaryArrayData::Binary(array) => {
let array = array.slice(offset, length);
Arc::new(Self {
array: BinaryArrayData::Binary(array),
})
}
BinaryArrayData::LargeBinary(array) => {
let array = array.slice(offset, length);
Arc::new(Self {
array: BinaryArrayData::LargeBinary(array),
})
}
BinaryArrayData::BinaryView(array) => {
let array = array.slice(offset, length);
Arc::new(Self {
array: BinaryArrayData::BinaryView(array),
})
}
}
}
fn get(&self, index: usize) -> Value {
vectors::impl_get_for_vector!(self.array, index)
match &self.array {
BinaryArrayData::Binary(array) => vectors::impl_get_for_vector!(array, index),
BinaryArrayData::LargeBinary(array) => vectors::impl_get_for_vector!(array, index),
BinaryArrayData::BinaryView(array) => vectors::impl_get_for_vector!(array, index),
}
}
fn get_ref(&self, index: usize) -> ValueRef<'_> {
vectors::impl_get_ref_for_vector!(self.array, index)
match &self.array {
BinaryArrayData::Binary(array) => vectors::impl_get_ref_for_vector!(array, index),
BinaryArrayData::LargeBinary(array) => vectors::impl_get_ref_for_vector!(array, index),
BinaryArrayData::BinaryView(array) => vectors::impl_get_ref_for_vector!(array, index),
}
}
}
impl From<Vec<Vec<u8>>> for BinaryVector {
fn from(data: Vec<Vec<u8>>) -> Self {
Self {
array: BinaryArray::from_iter_values(data),
array: BinaryArrayData::Binary(BinaryArray::from_iter_values(data)),
}
}
}
@@ -196,33 +273,76 @@ impl From<Vec<Vec<u8>>> for BinaryVector {
impl ScalarVector for BinaryVector {
type OwnedItem = Vec<u8>;
type RefItem<'a> = &'a [u8];
type Iter<'a> = ArrayIter<&'a BinaryArray>;
type Iter<'a> = BinaryIter<'a>;
type Builder = BinaryVectorBuilder;
fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
if self.array.is_valid(idx) {
Some(self.array.value(idx))
} else {
None
match &self.array {
BinaryArrayData::Binary(array) => array.is_valid(idx).then(|| array.value(idx)),
BinaryArrayData::LargeBinary(array) => array.is_valid(idx).then(|| array.value(idx)),
BinaryArrayData::BinaryView(array) => array.is_valid(idx).then(|| array.value(idx)),
}
}
fn iter_data(&self) -> Self::Iter<'_> {
self.array.iter()
match &self.array {
BinaryArrayData::Binary(array) => BinaryIter::Binary(array.iter()),
BinaryArrayData::LargeBinary(array) => BinaryIter::LargeBinary(array.iter()),
BinaryArrayData::BinaryView(array) => BinaryIter::BinaryView(array.iter()),
}
}
}
pub enum BinaryIter<'a> {
Binary(ArrayIter<&'a BinaryArray>),
LargeBinary(ArrayIter<&'a LargeBinaryArray>),
BinaryView(ArrayIter<&'a BinaryViewArray>),
}
impl<'a> Iterator for BinaryIter<'a> {
type Item = Option<&'a [u8]>;
fn next(&mut self) -> Option<Self::Item> {
match self {
BinaryIter::Binary(iter) => iter.next(),
BinaryIter::LargeBinary(iter) => iter.next(),
BinaryIter::BinaryView(iter) => iter.next(),
}
}
}
enum MutableBinaryArrayData {
Binary(MutableBinaryArray),
BinaryView(MutableBinaryViewArray),
}
pub struct BinaryVectorBuilder {
mutable_array: MutableBinaryArray,
mutable_array: MutableBinaryArrayData,
}
impl BinaryVectorBuilder {
pub fn with_view_capacity(capacity: usize) -> Self {
Self {
mutable_array: MutableBinaryArrayData::BinaryView(
MutableBinaryViewArray::with_capacity(capacity),
),
}
}
}
impl MutableVector for BinaryVectorBuilder {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::binary_datatype()
match &self.mutable_array {
MutableBinaryArrayData::Binary(_) => ConcreteDataType::binary_datatype(),
MutableBinaryArrayData::BinaryView(_) => ConcreteDataType::binary_view_datatype(),
}
}
fn len(&self) -> usize {
self.mutable_array.len()
match &self.mutable_array {
MutableBinaryArrayData::Binary(array) => array.len(),
MutableBinaryArrayData::BinaryView(array) => array.len(),
}
}
fn as_any(&self) -> &dyn Any {
@@ -242,10 +362,11 @@ impl MutableVector for BinaryVectorBuilder {
}
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
match value.try_into_binary()? {
Some(v) => self.mutable_array.append_value(v),
None => self.mutable_array.append_null(),
}
let value = value.try_into_binary()?;
match &mut self.mutable_array {
MutableBinaryArrayData::Binary(array) => array.append_option(value),
MutableBinaryArrayData::BinaryView(array) => array.append_option(value),
};
Ok(())
}
@@ -254,7 +375,10 @@ impl MutableVector for BinaryVectorBuilder {
}
fn push_null(&mut self) {
self.mutable_array.append_null()
match &mut self.mutable_array {
MutableBinaryArrayData::Binary(array) => array.append_null(),
MutableBinaryArrayData::BinaryView(array) => array.append_null(),
}
}
}
@@ -263,26 +387,38 @@ impl ScalarVectorBuilder for BinaryVectorBuilder {
fn with_capacity(capacity: usize) -> Self {
Self {
mutable_array: MutableBinaryArray::with_capacity(capacity, 0),
mutable_array: MutableBinaryArrayData::Binary(MutableBinaryArray::with_capacity(
capacity, 0,
)),
}
}
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
match value {
Some(v) => self.mutable_array.append_value(v),
None => self.mutable_array.append_null(),
}
match &mut self.mutable_array {
MutableBinaryArrayData::Binary(array) => array.append_option(value),
MutableBinaryArrayData::BinaryView(array) => array.append_option(value),
};
}
fn finish(&mut self) -> Self::VectorType {
BinaryVector {
array: self.mutable_array.finish(),
match &mut self.mutable_array {
MutableBinaryArrayData::Binary(array) => BinaryVector {
array: BinaryArrayData::Binary(array.finish()),
},
MutableBinaryArrayData::BinaryView(array) => BinaryVector {
array: BinaryArrayData::BinaryView(array.finish()),
},
}
}
fn finish_cloned(&self) -> Self::VectorType {
BinaryVector {
array: self.mutable_array.finish_cloned(),
match &self.mutable_array {
MutableBinaryArrayData::Binary(array) => BinaryVector {
array: BinaryArrayData::Binary(array.finish_cloned()),
},
MutableBinaryArrayData::BinaryView(array) => BinaryVector {
array: BinaryArrayData::BinaryView(array.finish_cloned()),
},
}
}
}
@@ -299,7 +435,26 @@ impl Serializable for BinaryVector {
}
}
vectors::impl_try_from_arrow_array_for_vector!(BinaryArray, BinaryVector);
impl BinaryVector {
pub fn try_from_arrow_array(
array: impl AsRef<dyn Array>,
) -> crate::error::Result<BinaryVector> {
let array = array.as_ref();
if let Some(binary_array) = array.as_any().downcast_ref::<BinaryArray>() {
Ok(BinaryVector::from(binary_array.clone()))
} else if let Some(large_binary_array) = array.as_any().downcast_ref::<LargeBinaryArray>() {
Ok(BinaryVector::from(large_binary_array.clone()))
} else if let Some(binary_view_array) = array.as_any().downcast_ref::<BinaryViewArray>() {
Ok(BinaryVector::from(binary_view_array.clone()))
} else {
Err(crate::error::UnsupportedArrowTypeSnafu {
arrow_type: array.data_type().clone(),
}
.build())
}
}
}
#[cfg(test)]
mod tests {
@@ -310,7 +465,7 @@ mod tests {
use serde_json;
use super::*;
use crate::arrow_array::BinaryArray;
use crate::arrow_array::{BinaryArray, LargeBinaryArray};
use crate::data_type::DataType;
use crate::serialize::Serializable;
use crate::types::BinaryType;
@@ -340,6 +495,33 @@ mod tests {
assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
}
#[test]
fn test_binary_view_vector_build_get() {
let mut builder = BinaryVectorBuilder::with_view_capacity(4);
builder.push(Some(b"hello"));
builder.push(None);
builder.push(Some(b"world"));
let vector = builder.finish();
assert_eq!(ConcreteDataType::binary_view_datatype(), vector.data_type());
assert_eq!(b"hello", vector.get_data(0).unwrap());
assert_eq!(None, vector.get_data(1));
assert_eq!(b"world", vector.get_data(2).unwrap());
assert_eq!(Value::Binary(b"hello".as_slice().into()), vector.get(0));
assert_eq!(Value::Null, vector.get(1));
assert_eq!(Value::Binary(b"world".as_slice().into()), vector.get(2));
let mut iter = vector.iter_data();
assert_eq!(b"hello", iter.next().unwrap().unwrap());
assert_eq!(None, iter.next().unwrap());
assert_eq!(b"world", iter.next().unwrap().unwrap());
assert_eq!(None, iter.next());
let arrow_arr = vector.to_arrow_array();
assert_eq!(&ArrowDataType::BinaryView, arrow_arr.data_type());
}
#[test]
fn test_serialize_binary_vector_to_json() {
let vector = BinaryVector::from(BinaryArray::from_iter_values([
@@ -374,7 +556,21 @@ mod tests {
let arrow_array = BinaryArray::from_iter_values([vec![1, 2, 3], vec![1, 2, 3]]);
let original = BinaryArray::from(arrow_array.to_data());
let vector = BinaryVector::from(arrow_array);
assert_eq!(original, vector.array);
let BinaryArrayData::Binary(array) = &vector.array else {
panic!("Expected BinaryArray");
};
assert_eq!(&original, array);
}
#[test]
fn test_from_large_binary_arrow_array() {
let arrow_array = LargeBinaryArray::from_iter_values([vec![1, 2, 3], vec![1, 2, 3]]);
let original = LargeBinaryArray::from(arrow_array.to_data());
let vector = BinaryVector::from(arrow_array);
let BinaryArrayData::LargeBinary(array) = &vector.array else {
panic!("Expected LargeBinaryArray");
};
assert_eq!(&original, array);
}
#[test]
@@ -426,7 +622,7 @@ mod tests {
fn test_binary_vector_builder() {
let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]);
let mut builder = BinaryType.create_mutable_vector(3);
let mut builder = BinaryType::default().create_mutable_vector(3);
builder.push_value_ref(&ValueRef::Binary("hello".as_bytes()));
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -275,10 +275,10 @@ impl Helper {
Ok(match array.as_ref().data_type() {
ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary
| ArrowDataType::FixedSizeBinary(_)
| ArrowDataType::BinaryView => {
ArrowDataType::Binary | ArrowDataType::BinaryView => {
Arc::new(BinaryVector::try_from_arrow_array(array)?)
}
ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
@@ -293,11 +293,7 @@ impl Helper {
ArrowDataType::UInt64 => Arc::new(UInt64Vector::try_from_arrow_array(array)?),
ArrowDataType::Float32 => Arc::new(Float32Vector::try_from_arrow_array(array)?),
ArrowDataType::Float64 => Arc::new(Float64Vector::try_from_arrow_array(array)?),
ArrowDataType::Utf8 => Arc::new(StringVector::try_from_arrow_array(array)?),
ArrowDataType::LargeUtf8 => Arc::new(StringVector::try_from_arrow_array(array)?),
ArrowDataType::Utf8View => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
.context(crate::error::ArrowComputeSnafu)?;
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => {
Arc::new(StringVector::try_from_arrow_array(array)?)
}
ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),

View File

@@ -19,7 +19,8 @@ use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef};
use snafu::ResultExt;
use crate::arrow_array::{
LargeStringArray, MutableLargeStringArray, MutableStringArray, StringArray,
LargeStringArray, MutableLargeStringArray, MutableStringArray, MutableStringViewArray,
StringArray, StringViewArray,
};
use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
@@ -33,6 +34,7 @@ use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef};
enum StringArrayData {
String(StringArray),
LargeString(LargeStringArray),
StringView(StringViewArray),
}
/// Vector of strings.
@@ -46,6 +48,7 @@ impl StringVector {
match &self.array {
StringArrayData::String(array) => array,
StringArrayData::LargeString(array) => array,
StringArrayData::StringView(array) => array,
}
}
@@ -63,6 +66,13 @@ impl StringVector {
}
}
/// Create a StringVector from a StringViewArray
pub fn from_string_view_array(array: StringViewArray) -> Self {
Self {
array: StringArrayData::StringView(array),
}
}
pub fn from_slice<T: AsRef<str>>(slice: &[T]) -> Self {
Self::from_string_array(StringArray::from_iter(
slice.iter().map(|s| Some(s.as_ref())),
@@ -82,6 +92,12 @@ impl From<LargeStringArray> for StringVector {
}
}
impl From<StringViewArray> for StringVector {
fn from(array: StringViewArray) -> Self {
Self::from_string_view_array(array)
}
}
impl From<Vec<Option<String>>> for StringVector {
fn from(data: Vec<Option<String>>) -> Self {
Self::from_string_array(StringArray::from_iter(data))
@@ -120,7 +136,11 @@ impl From<Vec<&str>> for StringVector {
impl Vector for StringVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::string_datatype()
match &self.array {
StringArrayData::String(_) => ConcreteDataType::string_datatype(),
StringArrayData::LargeString(_) => ConcreteDataType::large_string_datatype(),
StringArrayData::StringView(_) => ConcreteDataType::utf8_view_datatype(),
}
}
fn vector_type_name(&self) -> String {
@@ -135,6 +155,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => array.len(),
StringArrayData::LargeString(array) => array.len(),
StringArrayData::StringView(array) => array.len(),
}
}
@@ -142,6 +163,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => Arc::new(array.clone()),
StringArrayData::LargeString(array) => Arc::new(array.clone()),
StringArrayData::StringView(array) => Arc::new(array.clone()),
}
}
@@ -149,6 +171,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => Box::new(array.clone()),
StringArrayData::LargeString(array) => Box::new(array.clone()),
StringArrayData::StringView(array) => Box::new(array.clone()),
}
}
@@ -156,6 +179,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => vectors::impl_validity_for_vector!(array),
StringArrayData::LargeString(array) => vectors::impl_validity_for_vector!(array),
StringArrayData::StringView(array) => vectors::impl_validity_for_vector!(array),
}
}
@@ -163,6 +187,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => array.get_buffer_memory_size(),
StringArrayData::LargeString(array) => array.get_buffer_memory_size(),
StringArrayData::StringView(array) => array.get_buffer_memory_size(),
}
}
@@ -170,6 +195,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => array.null_count(),
StringArrayData::LargeString(array) => array.null_count(),
StringArrayData::StringView(array) => array.null_count(),
}
}
@@ -177,6 +203,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => array.is_null(row),
StringArrayData::LargeString(array) => array.is_null(row),
StringArrayData::StringView(array) => array.is_null(row),
}
}
@@ -188,6 +215,9 @@ impl Vector for StringVector {
StringArrayData::LargeString(array) => {
Arc::new(Self::from_large_string_array(array.slice(offset, length)))
}
StringArrayData::StringView(array) => {
Arc::new(Self::from_string_view_array(array.slice(offset, length)))
}
}
}
@@ -195,6 +225,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => vectors::impl_get_for_vector!(array, index),
StringArrayData::LargeString(array) => vectors::impl_get_for_vector!(array, index),
StringArrayData::StringView(array) => vectors::impl_get_for_vector!(array, index),
}
}
@@ -202,6 +233,7 @@ impl Vector for StringVector {
match &self.array {
StringArrayData::String(array) => vectors::impl_get_ref_for_vector!(array, index),
StringArrayData::LargeString(array) => vectors::impl_get_ref_for_vector!(array, index),
StringArrayData::StringView(array) => vectors::impl_get_ref_for_vector!(array, index),
}
}
}
@@ -209,6 +241,7 @@ impl Vector for StringVector {
pub enum StringIter<'a> {
String(ArrayIter<&'a StringArray>),
LargeString(ArrayIter<&'a LargeStringArray>),
StringView(ArrayIter<&'a StringViewArray>),
}
impl<'a> Iterator for StringIter<'a> {
@@ -218,6 +251,7 @@ impl<'a> Iterator for StringIter<'a> {
match self {
StringIter::String(iter) => iter.next(),
StringIter::LargeString(iter) => iter.next(),
StringIter::StringView(iter) => iter.next(),
}
}
}
@@ -244,6 +278,13 @@ impl ScalarVector for StringVector {
None
}
}
StringArrayData::StringView(array) => {
if array.is_valid(idx) {
Some(array.value(idx))
} else {
None
}
}
}
}
@@ -251,6 +292,7 @@ impl ScalarVector for StringVector {
match &self.array {
StringArrayData::String(array) => StringIter::String(array.iter()),
StringArrayData::LargeString(array) => StringIter::LargeString(array.iter()),
StringArrayData::StringView(array) => StringIter::StringView(array.iter()),
}
}
}
@@ -259,6 +301,7 @@ impl ScalarVector for StringVector {
enum MutableStringArrayData {
String(MutableStringArray),
LargeString(MutableLargeStringArray),
StringView(MutableStringViewArray),
}
pub struct StringVectorBuilder {
@@ -286,6 +329,13 @@ impl StringVectorBuilder {
}
}
/// Create a builder for view strings
pub fn new_view() -> Self {
Self {
mutable_array: MutableStringArrayData::StringView(MutableStringViewArray::new()),
}
}
/// Create a builder for regular strings with capacity
pub fn with_string_capacity(capacity: usize) -> Self {
Self {
@@ -303,17 +353,31 @@ impl StringVectorBuilder {
),
}
}
/// Create a builder for view strings with capacity
pub fn with_view_capacity(capacity: usize) -> Self {
Self {
mutable_array: MutableStringArrayData::StringView(
MutableStringViewArray::with_capacity(capacity),
),
}
}
}
impl MutableVector for StringVectorBuilder {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::string_datatype()
match &self.mutable_array {
MutableStringArrayData::String(_) => ConcreteDataType::string_datatype(),
MutableStringArrayData::LargeString(_) => ConcreteDataType::large_string_datatype(),
MutableStringArrayData::StringView(_) => ConcreteDataType::utf8_view_datatype(),
}
}
fn len(&self) -> usize {
match &self.mutable_array {
MutableStringArrayData::String(array) => array.len(),
MutableStringArrayData::LargeString(array) => array.len(),
MutableStringArrayData::StringView(array) => array.len(),
}
}
@@ -337,10 +401,12 @@ impl MutableVector for StringVectorBuilder {
Some(v) => match &mut self.mutable_array {
MutableStringArrayData::String(array) => array.append_value(v),
MutableStringArrayData::LargeString(array) => array.append_value(v),
MutableStringArrayData::StringView(array) => array.append_value(v),
},
None => match &mut self.mutable_array {
MutableStringArrayData::String(array) => array.append_null(),
MutableStringArrayData::LargeString(array) => array.append_null(),
MutableStringArrayData::StringView(array) => array.append_null(),
},
}
Ok(())
@@ -354,6 +420,7 @@ impl MutableVector for StringVectorBuilder {
match &mut self.mutable_array {
MutableStringArrayData::String(array) => array.append_null(),
MutableStringArrayData::LargeString(array) => array.append_null(),
MutableStringArrayData::StringView(array) => array.append_null(),
}
}
}
@@ -374,10 +441,12 @@ impl ScalarVectorBuilder for StringVectorBuilder {
Some(v) => match &mut self.mutable_array {
MutableStringArrayData::String(array) => array.append_value(v),
MutableStringArrayData::LargeString(array) => array.append_value(v),
MutableStringArrayData::StringView(array) => array.append_value(v),
},
None => match &mut self.mutable_array {
MutableStringArrayData::String(array) => array.append_null(),
MutableStringArrayData::LargeString(array) => array.append_null(),
MutableStringArrayData::StringView(array) => array.append_null(),
},
}
}
@@ -390,6 +459,9 @@ impl ScalarVectorBuilder for StringVectorBuilder {
MutableStringArrayData::LargeString(array) => {
StringVector::from_large_string_array(array.finish())
}
MutableStringArrayData::StringView(array) => {
StringVector::from_string_view_array(array.finish())
}
}
}
@@ -401,6 +473,9 @@ impl ScalarVectorBuilder for StringVectorBuilder {
MutableStringArrayData::LargeString(array) => {
StringVector::from_large_string_array(array.finish_cloned())
}
MutableStringArrayData::StringView(array) => {
StringVector::from_string_view_array(array.finish_cloned())
}
}
}
}
@@ -426,6 +501,10 @@ impl StringVector {
Ok(StringVector::from_large_string_array(
large_string_array.clone(),
))
} else if let Some(string_view_array) = array.as_any().downcast_ref::<StringViewArray>() {
Ok(StringVector::from_string_view_array(
string_view_array.clone(),
))
} else {
Err(crate::error::UnsupportedArrowTypeSnafu {
arrow_type: array.data_type().clone(),
@@ -470,6 +549,36 @@ mod tests {
assert_eq!(None, iter.next());
}
#[test]
fn test_string_view_vector_build_get() {
let mut builder = StringVectorBuilder::with_view_capacity(4);
builder.push(Some("hello"));
builder.push(None);
builder.push(Some("world"));
let vector = builder.finish();
assert_eq!(ConcreteDataType::utf8_view_datatype(), vector.data_type());
assert_eq!(Some("hello"), vector.get_data(0));
assert_eq!(None, vector.get_data(1));
assert_eq!(Some("world"), vector.get_data(2));
// Get out of bound
assert!(vector.try_get(3).is_err());
assert_eq!(Value::String("hello".into()), vector.get(0));
assert_eq!(Value::Null, vector.get(1));
assert_eq!(Value::String("world".into()), vector.get(2));
let mut iter = vector.iter_data();
assert_eq!("hello", iter.next().unwrap().unwrap());
assert_eq!(None, iter.next().unwrap());
assert_eq!("world", iter.next().unwrap().unwrap());
assert_eq!(None, iter.next());
let arrow_arr = vector.to_arrow_array();
assert_eq!(&DataType::Utf8View, arrow_arr.data_type());
}
#[test]
fn test_string_vector_builder() {
let mut builder = StringVectorBuilder::with_capacity(3);

View File

@@ -895,36 +895,12 @@ impl RowWriter {
let v = array.value(i);
self.insert(column, v);
}
DataType::Utf8 => {
let array = array.as_string::<i32>();
let v = array.value(i);
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
let v = datatypes::arrow_array::string_array_value(array, i);
self.insert(column, v);
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Utf8View => {
let array = array.as_string_view();
let v = array.value(i);
self.insert(column, v);
}
DataType::Binary => {
let array = array.as_binary::<i32>();
let v = array.value(i);
let column_schema = &schema.column_schemas()[j];
self.insert_bytes(column_schema, v)?;
}
DataType::LargeBinary => {
let array = array.as_binary::<i64>();
let v = array.value(i);
let column_schema = &schema.column_schemas()[j];
self.insert_bytes(column_schema, v)?;
}
DataType::BinaryView => {
let array = array.as_binary_view();
let v = array.value(i);
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let v = datatypes::arrow_array::binary_array_value(array, i);
let column_schema = &schema.column_schemas()[j];
self.insert_bytes(column_schema, v)?;
}

View File

@@ -175,34 +175,12 @@ impl HttpOutputWriter {
let v = array.value(i);
self.write_value(v)?;
}
DataType::Utf8 => {
let array = array.as_string::<i32>();
let v = array.value(i);
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
let v = datatypes::arrow_array::string_array_value(array, i);
self.write_value(v)?;
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Utf8View => {
let array = array.as_string_view();
let v = array.value(i);
self.write_value(v)?;
}
DataType::Binary => {
let array = array.as_binary::<i32>();
let v = array.value(i);
self.write_bytes(v, &schema.data_type)?;
}
DataType::LargeBinary => {
let array = array.as_binary::<i64>();
let v = array.value(i);
self.write_bytes(v, &schema.data_type)?;
}
DataType::BinaryView => {
let array = array.as_binary_view();
let v = array.value(i);
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let v = datatypes::arrow_array::binary_array_value(array, i);
self.write_bytes(v, &schema.data_type)?;
}
DataType::Date32 => {

View File

@@ -280,41 +280,12 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
let array = column.as_primitive::<Float64Type>();
row_writer.write_col(array.value(i))?;
}
DataType::Utf8 => {
let array = column.as_string::<i32>();
row_writer.write_col(array.value(i))?;
DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 => {
let v = datatypes::arrow_array::string_array_value(column, i);
row_writer.write_col(v)?;
}
DataType::Utf8View => {
let array = column.as_string_view();
row_writer.write_col(array.value(i))?;
}
DataType::LargeUtf8 => {
let array = column.as_string::<i64>();
row_writer.write_col(array.value(i))?;
}
DataType::Binary => {
let array = column.as_binary::<i32>();
let v = array.value(i);
if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
} else {
row_writer.write_col(v)?;
}
}
DataType::BinaryView => {
let array = column.as_binary_view();
let v = array.value(i);
if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
} else {
row_writer.write_col(v)?;
}
}
DataType::LargeBinary => {
let array = column.as_binary::<i64>();
let v = array.value(i);
DataType::Binary | DataType::BinaryView | DataType::LargeBinary => {
let v = datatypes::arrow_array::binary_array_value(column, i);
if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;

View File

@@ -500,44 +500,12 @@ impl RecordBatchRowIterator {
let array = column.as_primitive::<Float64Type>();
encoder.encode_field(&array.value(i))?;
}
DataType::Utf8 => {
let array = column.as_string::<i32>();
let value = array.value(i);
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
let value = datatypes::arrow_array::string_array_value(column, i);
encoder.encode_field(&value)?;
}
DataType::Utf8View => {
let array = column.as_string_view();
let value = array.value(i);
encoder.encode_field(&value)?;
}
DataType::LargeUtf8 => {
let array = column.as_string::<i64>();
let value = array.value(i);
encoder.encode_field(&value)?;
}
DataType::Binary => {
let array = column.as_binary::<i32>();
let v = array.value(i);
encode_bytes(
&self.schema.column_schemas()[j],
v,
encoder,
&self.query_ctx,
)?;
}
DataType::BinaryView => {
let array = column.as_binary_view();
let v = array.value(i);
encode_bytes(
&self.schema.column_schemas()[j],
v,
encoder,
&self.query_ctx,
)?;
}
DataType::LargeBinary => {
let array = column.as_binary::<i64>();
let v = array.value(i);
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
let v = datatypes::arrow_array::binary_array_value(column, i);
encode_bytes(
&self.schema.column_schemas()[j],
v,