Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-02 11:22:47 +08:00
parent 5bd3985b8b
commit 820855527d

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use arrow::compute;
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, StructArray, new_null_array};
use arrow_array::{Array, ArrayRef, ListArray, StructArray, new_null_array};
use arrow_schema::DataType;
use snafu::ResultExt;
@@ -30,6 +30,28 @@ pub struct JsonArray<'a> {
}
impl JsonArray<'_> {
fn align_array(array: &ArrayRef, expect: &DataType) -> Result<ArrayRef> {
if array.data_type() == expect {
return Ok(array.clone());
}
match expect {
DataType::Struct(_) => JsonArray::from(array).try_align(expect),
DataType::List(expect_field) => {
let list_array = array.as_list();
let aligned_values =
Self::align_array(&list_array.values(), expect_field.data_type())?;
Ok(Arc::new(ListArray::new(
expect_field.clone(),
list_array.offsets().clone(),
aligned_values,
list_array.nulls().cloned(),
)))
}
_ => JsonArray::from(array).try_cast(expect),
}
}
/// Align a JSON array to the `expect` data type. The `expect` data type is often the
/// "largest" JSON type after some insertions in the table schema, while the JSON array previously
/// written in the SST could be lagged behind it. So it's important to "align" the JSON array by
@@ -41,9 +63,10 @@ impl JsonArray<'_> {
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
/// JSON array is physically stored.
pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
let json_type = self.inner.data_type();
if json_type == expect {
return Ok(self.inner.clone());
if self.inner.data_type() != expect {
if matches!(expect, DataType::List(_)) {
return Self::align_array(self.inner, expect);
}
}
let struct_array = self.inner.as_struct();
@@ -70,14 +93,7 @@ impl JsonArray<'_> {
if expect_field.data_type() == array_field.data_type() {
aligned.push(array_columns[j].clone());
} else {
let array = JsonArray::from(&array_columns[j]);
if matches!(expect_field.data_type(), DataType::Struct(_)) {
// A `StructArray` in a JSON array must be another JSON array.
// (Like a nested JSON object in a JSON value.)
aligned.push(array.try_align(expect_field.data_type())?);
} else {
aligned.push(array.try_cast(expect_field.data_type())?);
}
aligned.push(Self::align_array(&array_columns[j], expect_field.data_type())?);
}
i += 1;
j += 1;
@@ -137,6 +153,7 @@ impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
#[cfg(test)]
mod test {
use arrow::buffer::OffsetBuffer;
use arrow_array::types::Int64Type;
use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
use arrow_schema::{Field, Fields};
@@ -299,6 +316,79 @@ mod test {
),
)
.test()?;
// Test nested list<struct> json array alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new_list(
"items",
Field::new(
"item",
DataType::Struct(
vec![Arc::new(Field::new("a", DataType::Int64, true))].into(),
),
true,
),
true,
)),
Arc::new(ListArray::new(
Arc::new(Field::new(
"item",
DataType::Struct(
vec![Arc::new(Field::new("a", DataType::Int64, true))].into(),
),
true,
)),
OffsetBuffer::new(vec![0, 1, 3].into()),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
)])),
None,
)) as ArrayRef,
)]),
Fields::from(vec![Field::new_list(
"items",
Field::new(
"item",
DataType::Struct(
vec![
Arc::new(Field::new("a", DataType::Int64, true)),
Arc::new(Field::new("b", DataType::Utf8, true)),
]
.into(),
),
true,
),
true,
)]),
Ok(vec![Arc::new(ListArray::new(
Arc::new(Field::new(
"item",
DataType::Struct(
vec![
Arc::new(Field::new("a", DataType::Int64, true)),
Arc::new(Field::new("b", DataType::Utf8, true)),
]
.into(),
),
true,
)),
OffsetBuffer::new(vec![0, 1, 3].into()),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("a", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("b", DataType::Utf8, true)),
Arc::new(StringArray::new_null(3)) as ArrayRef,
),
])),
None,
)) as ArrayRef]),
)
.test()?;
Ok(())
}
}