Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-03 12:27:48 +08:00
parent a790a6150b
commit a151be5209

View File

@@ -18,7 +18,10 @@ use std::sync::Arc;
use arrow::compute;
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, StructArray, new_null_array};
use arrow_array::{
Array, ArrayRef, GenericListArray, GenericListViewArray, OffsetSizeTrait, StructArray,
new_null_array,
};
use arrow_schema::DataType;
use snafu::ResultExt;
@@ -71,13 +74,7 @@ impl JsonArray<'_> {
aligned.push(array_columns[j].clone());
} else {
let array = JsonArray::from(&array_columns[j]);
if matches!(expect_field.data_type(), DataType::Struct(_)) {
// A `StructArray` in a JSON array must be another JSON array.
// (Like a nested JSON object in a JSON value.)
aligned.push(array.try_align(expect_field.data_type())?);
} else {
aligned.push(array.try_cast(expect_field.data_type())?);
}
aligned.push(array.try_align_field(expect_field.data_type())?);
}
i += 1;
j += 1;
@@ -111,6 +108,67 @@ impl JsonArray<'_> {
Ok(Arc::new(json_array))
}
fn try_align_field(&self, expect: &DataType) -> Result<ArrayRef> {
match expect {
// A `StructArray` in a JSON array must be another JSON array.
// (Like a nested JSON object in a JSON value.)
DataType::Struct(_) => self.try_align(expect),
DataType::List(_) => self.try_align_list::<i32>(expect),
DataType::LargeList(_) => self.try_align_list::<i64>(expect),
DataType::ListView(_) => self.try_align_list_view::<i32>(expect),
DataType::LargeListView(_) => self.try_align_list_view::<i64>(expect),
_ => self.try_cast(expect),
}
}
fn try_align_list<O: OffsetSizeTrait>(&self, expect: &DataType) -> Result<ArrayRef> {
let Some(list) = self.inner.as_any().downcast_ref::<GenericListArray<O>>() else {
return AlignJsonArraySnafu {
reason: format!(
"cannot align {:?} to complex type {expect:?}",
self.inner.data_type()
),
}
.fail();
};
let (_, offsets, values, nulls) = list.clone().into_parts();
let field = match expect {
DataType::List(field) | DataType::LargeList(field) => field.clone(),
_ => unreachable!(),
};
let values = JsonArray::from(&values).try_align_field(field.data_type())?;
Ok(Arc::new(GenericListArray::<O>::new(
field, offsets, values, nulls,
)))
}
fn try_align_list_view<O: OffsetSizeTrait>(&self, expect: &DataType) -> Result<ArrayRef> {
let Some(list) = self
.inner
.as_any()
.downcast_ref::<GenericListViewArray<O>>()
else {
return AlignJsonArraySnafu {
reason: format!(
"cannot align {:?} to complex type {expect:?}",
self.inner.data_type()
),
}
.fail();
};
let (_, offsets, sizes, values, nulls) = list.clone().into_parts();
let field = match expect {
DataType::ListView(field) | DataType::LargeListView(field) => field.clone(),
_ => unreachable!(),
};
let values = JsonArray::from(&values).try_align_field(field.data_type())?;
Ok(Arc::new(GenericListViewArray::<O>::new(
field, offsets, sizes, values, nulls,
)))
}
fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
if compute::can_cast_types(self.inner.data_type(), to_type) {
return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
@@ -137,8 +195,12 @@ impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
#[cfg(test)]
mod test {
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::buffer::NullBuffer;
use arrow_array::types::Int64Type;
use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
use arrow_array::{
BooleanArray, Float64Array, Int32Array, Int64Array, ListArray, OffsetSizeTrait,
};
use arrow_schema::{Field, Fields};
use super::*;
@@ -285,6 +347,65 @@ mod test {
)
.test()?;
// Test complex list item alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new_list(
"images",
Field::new_list_field(
DataType::Struct(Fields::from(vec![Arc::new(Field::new(
"$type",
DataType::Utf8,
true,
))])),
true,
),
true,
)),
build_list_array_with_struct_items(
vec![
Some(vec![Some("image"), Some("thumb")]),
None,
Some(vec![Some("banner")]),
],
DataType::List(Arc::new(Field::new_list_field(
DataType::Struct(Fields::from(vec![Arc::new(Field::new(
"$type",
DataType::Utf8,
true,
))])),
true,
))),
),
)]),
Fields::from(vec![Field::new_list(
"images",
Field::new_list_field(
DataType::Struct(Fields::from(vec![
Arc::new(Field::new("$type", DataType::Utf8, true)),
Arc::new(Field::new("alt", DataType::Utf8, true)),
])),
true,
),
true,
)]),
Ok(vec![build_list_array_with_struct_items(
vec![
Some(vec![Some("image"), Some("thumb")]),
None,
Some(vec![Some("banner")]),
],
DataType::List(Arc::new(Field::new_list_field(
DataType::Struct(Fields::from(vec![
Arc::new(Field::new("$type", DataType::Utf8, true)),
Arc::new(Field::new("alt", DataType::Utf8, true)),
])),
true,
))),
)]),
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
@@ -301,4 +422,66 @@ mod test {
.test()?;
Ok(())
}
fn build_list_array_with_struct_items<O: OffsetSizeTrait>(
rows: Vec<Option<Vec<Option<&str>>>>,
data_type: DataType,
) -> ArrayRef {
let mut offsets = vec![O::usize_as(0)];
let mut types = Vec::new();
let mut alt = Vec::new();
let mut validity = Vec::new();
let mut total = 0usize;
let has_alt = matches!(
&data_type,
DataType::List(field) | DataType::LargeList(field)
if matches!(field.data_type(), DataType::Struct(fields) if fields.len() > 1)
);
for row in rows {
match row {
Some(items) => {
validity.push(true);
total += items.len();
offsets.push(O::usize_as(total));
for item in items {
types.push(item);
if has_alt {
alt.push(None);
}
}
}
None => {
validity.push(false);
offsets.push(O::usize_as(total));
}
}
}
let mut columns = vec![Arc::new(Field::new("$type", DataType::Utf8, true))];
let mut values = vec![Arc::new(StringArray::from(types)) as ArrayRef];
if has_alt {
columns.push(Arc::new(Field::new("alt", DataType::Utf8, true)));
values.push(Arc::new(StringArray::from(alt)));
}
let values = Arc::new(StructArray::new(Fields::from(columns), values, None)) as ArrayRef;
let nulls = Some(NullBuffer::from(validity));
match data_type {
DataType::List(field) => Arc::new(ListArray::new(
field,
OffsetBuffer::new(ScalarBuffer::from(offsets)),
values,
nulls,
)),
DataType::LargeList(field) => Arc::new(arrow_array::LargeListArray::new(
field,
OffsetBuffer::new(ScalarBuffer::from(offsets)),
values,
nulls,
)),
_ => unreachable!(),
}
}
}