From 2fc6c2da7b62a8f8799134a3a4c1a59008c5bb44 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 21 Apr 2026 18:14:36 +0800 Subject: [PATCH] feat: json2 flush Signed-off-by: luofucong --- src/common/recordbatch/src/error.rs | 10 +- src/common/recordbatch/src/recordbatch.rs | 252 +------------ src/datatypes/src/error.rs | 10 +- src/datatypes/src/schema.rs | 1 + src/datatypes/src/schema/ext.rs | 27 ++ src/datatypes/src/types/json_type.rs | 6 + src/datatypes/src/vectors.rs | 2 +- src/datatypes/src/vectors/json.rs | 1 + src/datatypes/src/vectors/json/array.rs | 342 ++++++++++++++++++ src/mito2/src/memtable/bulk/part.rs | 88 ++++- src/mito2/src/read/compat.rs | 15 +- src/mito2/src/sst/parquet/flat_format.rs | 3 +- src/mito2/src/sst/parquet/format.rs | 1 + src/mito2/src/sst/parquet/writer.rs | 9 +- src/sql/src/statements.rs | 2 +- tests-integration/src/tests/instance_test.rs | 29 +- .../standalone/common/types/json/json2.result | 24 ++ .../standalone/common/types/json/json2.sql | 6 + 18 files changed, 522 insertions(+), 306 deletions(-) create mode 100644 src/datatypes/src/schema/ext.rs create mode 100644 src/datatypes/src/vectors/json/array.rs diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 6d794463a0..00d4291ead 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -188,13 +188,6 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - - #[snafu(display("Failed to align JSON array, reason: {reason}"))] - AlignJsonArray { - reason: String, - #[snafu(implicit)] - location: Location, - }, } impl ErrorExt for Error { @@ -210,8 +203,7 @@ impl ErrorExt for Error { | Error::ToArrowScalar { .. } | Error::ProjectArrowRecordBatch { .. } | Error::PhysicalExpr { .. } - | Error::RecordBatchSliceIndexOverflow { .. } - | Error::AlignJsonArray { .. } => StatusCode::Internal, + | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal, Error::PollStream { .. } => StatusCode::EngineExecuteQuery, diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 2e92b9e87a..4289714afd 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -20,10 +20,11 @@ use datafusion::arrow::util::pretty::pretty_format_batches; use datafusion_common::arrow::array::ArrayRef; use datafusion_common::arrow::compute; use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; -use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array}; +use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions}; use datatypes::extension::json::is_json_extension_type; use datatypes::prelude::DataType; use datatypes::schema::SchemaRef; +use datatypes::vectors::json::array::JsonArray; use datatypes::vectors::{Helper, VectorRef}; use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; @@ -31,8 +32,8 @@ use snafu::{OptionExt, ResultExt, ensure}; use crate::DfRecordBatch; use crate::error::{ - self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, - NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result, + self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu, + Result, }; /// A two-dimensional batch of column-oriented data with a defined schema. @@ -354,80 +355,6 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } -/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the -/// "largest" json type after some insertions in the table schema, while the json array previously -/// written in the SST could be lagged behind it. So it's important to "amend" the json array's -/// missing fields with null arrays, to align the array's data type with the provided one. -/// -/// # Panics -/// -/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not -/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how -/// json array is physically stored. -pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result { - let json_type = json_array.data_type(); - if json_type == schema_type { - return Ok(json_array.clone()); - } - - let json_array = json_array.as_struct(); - let array_fields = json_array.fields(); - let array_columns = json_array.columns(); - let ArrowDataType::Struct(schema_fields) = schema_type else { - unreachable!() - }; - let mut aligned = Vec::with_capacity(schema_fields.len()); - - // Compare the fields in the json array and the to-be-aligned schema, amending with null arrays - // on the way. It's very important to note that fields in the json array and in the json type - // are both SORTED. - - let mut i = 0; // point to the schema fields - let mut j = 0; // point to the array fields - while i < schema_fields.len() && j < array_fields.len() { - let schema_field = &schema_fields[i]; - let array_field = &array_fields[j]; - if schema_field.name() == array_field.name() { - if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) { - // A `StructArray`s in a json array must be another json array. (Like a nested json - // object in a json value.) - aligned.push(align_json_array( - &array_columns[j], - schema_field.data_type(), - )?); - } else { - aligned.push(array_columns[j].clone()); - } - j += 1; - } else { - aligned.push(new_null_array(schema_field.data_type(), json_array.len())); - } - i += 1; - } - if i < schema_fields.len() { - for field in &schema_fields[i..] { - aligned.push(new_null_array(field.data_type(), json_array.len())); - } - } - ensure!( - j == array_fields.len(), - AlignJsonArraySnafu { - reason: format!( - "this json array has more fields {:?}", - array_fields[j..] - .iter() - .map(|x| x.name()) - .collect::>(), - ) - } - ); - - let json_array = - StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned()) - .context(NewDfRecordBatchSnafu)?; - Ok(Arc::new(json_array)) -} - fn maybe_align_json_array_with_schema( schema: &ArrowSchemaRef, arrays: Vec, @@ -443,7 +370,9 @@ fn maybe_align_json_array_with_schema( continue; } - let json_array = align_json_array(&array, field.data_type())?; + let json_array = JsonArray::from(&array) + .try_align(field.data_type()) + .context(DataTypesSnafu)?; aligned.push(json_array); } Ok(aligned) @@ -453,12 +382,8 @@ fn maybe_align_json_array_with_schema( mod tests { use std::sync::Arc; - use datatypes::arrow::array::{ - AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array, - }; - use datatypes::arrow::datatypes::{ - DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type, - }; + use datatypes::arrow::array::{AsArray, UInt32Array}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type}; use datatypes::arrow_array::StringArray; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; @@ -466,165 +391,6 @@ mod tests { use super::*; - #[test] - fn test_align_json_array() -> Result<()> { - struct TestCase { - json_array: ArrayRef, - schema_type: DataType, - expected: std::result::Result, - } - - impl TestCase { - fn new( - json_array: StructArray, - schema_type: Fields, - expected: std::result::Result, String>, - ) -> Self { - Self { - json_array: Arc::new(json_array), - schema_type: DataType::Struct(schema_type.clone()), - expected: expected - .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef), - } - } - - fn test(self) -> Result<()> { - let result = align_json_array(&self.json_array, &self.schema_type); - match (result, self.expected) { - (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected), - (Ok(json_array), Err(e)) => { - panic!("expecting error {e} but actually get: {json_array:?}") - } - (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected), - (Err(e), Ok(_)) => return Err(e), - } - Ok(()) - } - } - - // Test empty json array can be aligned with a complex json type. - TestCase::new( - StructArray::new_empty_fields(2, None), - Fields::from(vec![ - Field::new("int", DataType::Int64, true), - Field::new_struct( - "nested", - vec![Field::new("bool", DataType::Boolean, true)], - true, - ), - Field::new("string", DataType::Utf8, true), - ]), - Ok(vec![ - Arc::new(Int64Array::new_null(2)) as ArrayRef, - Arc::new(StructArray::new_null( - Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]), - 2, - )), - Arc::new(StringArray::new_null(2)), - ]), - ) - .test()?; - - // Test simple json array alignment. - TestCase::new( - StructArray::from(vec![( - Arc::new(Field::new("float", DataType::Float64, true)), - Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, - )]), - Fields::from(vec![ - Field::new("float", DataType::Float64, true), - Field::new("string", DataType::Utf8, true), - ]), - Ok(vec![ - Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, - Arc::new(StringArray::new_null(3)), - ]), - ) - .test()?; - - // Test complex json array alignment. - TestCase::new( - StructArray::from(vec![ - ( - Arc::new(Field::new_list( - "list", - Field::new_list_field(DataType::Int64, true), - true, - )), - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1)]), - None, - Some(vec![Some(2), Some(3)]), - ])) as ArrayRef, - ), - ( - Arc::new(Field::new_struct( - "nested", - vec![Field::new("int", DataType::Int64, true)], - true, - )), - Arc::new(StructArray::from(vec![( - Arc::new(Field::new("int", DataType::Int64, true)), - Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef, - )])), - ), - ( - Arc::new(Field::new("string", DataType::Utf8, true)), - Arc::new(StringArray::from(vec!["a", "b", "c"])), - ), - ]), - Fields::from(vec![ - Field::new("bool", DataType::Boolean, true), - Field::new_list("list", Field::new_list_field(DataType::Int64, true), true), - Field::new_struct( - "nested", - vec![ - Field::new("float", DataType::Float64, true), - Field::new("int", DataType::Int64, true), - ], - true, - ), - Field::new("string", DataType::Utf8, true), - ]), - Ok(vec![ - Arc::new(BooleanArray::new_null(3)) as ArrayRef, - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1)]), - None, - Some(vec![Some(2), Some(3)]), - ])), - Arc::new(StructArray::from(vec![ - ( - Arc::new(Field::new("float", DataType::Float64, true)), - Arc::new(Float64Array::new_null(3)) as ArrayRef, - ), - ( - Arc::new(Field::new("int", DataType::Int64, true)), - Arc::new(Int64Array::from(vec![-1, -2, -3])), - ), - ])), - Arc::new(StringArray::from(vec!["a", "b", "c"])), - ]), - ) - .test()?; - - // Test align failed. - TestCase::new( - StructArray::try_from(vec![ - ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef), - ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef), - ]) - .unwrap(), - Fields::from(vec![Field::new("i", DataType::Int64, true)]), - Err( - r#"Failed to align JSON array, reason: this json array has more fields ["j"]"# - .to_string(), - ), - ) - .test()?; - Ok(()) - } - #[test] fn test_record_batch() { let arrow_schema = Arc::new(ArrowSchema::new(vec![ diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 7e3f2b95d3..4ca765222a 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -281,6 +281,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to align JSON array, reason: {reason}"))] + AlignJsonArray { + reason: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -324,7 +331,8 @@ impl ErrorExt for Error { | ParseExtendedType { .. } | InconsistentStructFieldsAndItems { .. } | ArrowMetadata { .. } - | AlignJsonValue { .. } => StatusCode::Internal, + | AlignJsonValue { .. } + | AlignJsonArray { .. } => StatusCode::Internal, } } diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 50f2dba270..d53b42e3e7 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -14,6 +14,7 @@ mod column_schema; pub mod constraint; +pub mod ext; use std::collections::HashMap; use std::sync::Arc; diff --git a/src/datatypes/src/schema/ext.rs b/src/datatypes/src/schema/ext.rs new file mode 100644 index 0000000000..16ffcc6901 --- /dev/null +++ b/src/datatypes/src/schema/ext.rs @@ -0,0 +1,27 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::extension::json; + +/// Add some useful utilities upon Arrow's [Schema](arrow_schema::Schema). +pub trait ArrowSchemaExt { + /// Check if this [Schema](arrow_schema::Schema) has JSON extension field. + fn has_json_extension_field(&self) -> bool; +} + +impl ArrowSchemaExt for arrow_schema::Schema { + fn has_json_extension_field(&self) -> bool { + self.fields().iter().any(json::is_json_extension_type) + } +} diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index d296dc4fcb..c2d9961db8 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -317,6 +317,12 @@ fn merge(this: &JsonNativeType, that: &JsonNativeType) -> JsonNativeType { } } +impl From<&ArrowDataType> for JsonType { + fn from(t: &ArrowDataType) -> Self { + JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t))) + } +} + impl DataType for JsonType { fn name(&self) -> String { match &self.format { diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 7c7d2a4ad6..5f116e0952 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -35,7 +35,7 @@ mod duration; mod eq; mod helper; mod interval; -pub(crate) mod json; +pub mod json; mod list; mod null; pub(crate) mod operations; diff --git a/src/datatypes/src/vectors/json.rs b/src/datatypes/src/vectors/json.rs index 83aa1dd2aa..3dcb8d7bc0 100644 --- a/src/datatypes/src/vectors/json.rs +++ b/src/datatypes/src/vectors/json.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod array; pub(crate) mod builder; diff --git a/src/datatypes/src/vectors/json/array.rs b/src/datatypes/src/vectors/json/array.rs new file mode 100644 index 0000000000..9def5735bb --- /dev/null +++ b/src/datatypes/src/vectors/json/array.rs @@ -0,0 +1,342 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::sync::Arc; + +use arrow::compute; +use arrow::util::display::{ArrayFormatter, FormatOptions}; +use arrow_array::cast::AsArray; +use arrow_array::{Array, ArrayRef, GenericListArray, StructArray, new_null_array}; +use arrow_schema::DataType; +use snafu::{OptionExt, ResultExt}; + +use crate::arrow_array::StringArray; +use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result}; + +pub struct JsonArray<'a> { + inner: &'a ArrayRef, +} + +impl JsonArray<'_> { + /// Align a JSON array to the `expect` data type. The `expect` data type is often the "largest" + /// JSON type after some insertions in the table schema, while the JSON array previously + /// written in the SST could be lagged behind it. So it's important to "align" the JSON array by + /// setting the missing fields with null arrays, or casting the data. + /// + /// It's an error if the to-be-aligned array contains extra fields that are not in the `expect` + /// data type. Forcing to align that kind of array will result in data loss, something we + /// generally not wanted. + pub fn try_align(&self, expect: &DataType) -> Result { + if self.inner.data_type() == expect { + return Ok(self.inner.clone()); + } + + let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu { + reason: "expect struct array", + })?; + let array_fields = struct_array.fields(); + let array_columns = struct_array.columns(); + let DataType::Struct(expect_fields) = expect else { + return AlignJsonArraySnafu { + reason: "expect struct datatype", + } + .fail(); + }; + let mut aligned = Vec::with_capacity(expect_fields.len()); + + // Compare the fields in the JSON array and the to-be-aligned schema, amending with null + // arrays on the way. It's very important to note that fields in the JSON array and those + // in the JSON type are both **SORTED**, which can be guaranteed because the fields in the + // JSON type implementation are sorted. + debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted()); + debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted()); + + let mut i = 0; // point to the expect fields + let mut j = 0; // point to the array fields + while i < expect_fields.len() && j < array_fields.len() { + let expect_field = &expect_fields[i]; + let array_field = &array_fields[j]; + match expect_field.name().cmp(array_field.name()) { + Ordering::Equal => { + if expect_field.data_type() == array_field.data_type() { + aligned.push(array_columns[j].clone()); + } else { + let expect_type = expect_field.data_type(); + let array_type = array_field.data_type(); + let array = match (expect_type, array_type) { + (DataType::Struct(_), DataType::Struct(_)) => { + JsonArray::from(&array_columns[j]).try_align(expect_type)? + } + (DataType::List(expect_item), DataType::List(array_item)) => { + let list_array = array_columns[j].as_list::(); + let item_aligned = + match (expect_item.data_type(), array_item.data_type()) { + (DataType::Struct(_), DataType::Struct(_)) => { + JsonArray::from(list_array.values()) + .try_align(expect_item.data_type())? + } + _ => JsonArray::from(list_array.values()) + .try_cast(expect_item.data_type())?, + }; + Arc::new( + GenericListArray::::try_new( + expect_item.clone(), + list_array.offsets().clone(), + item_aligned, + list_array.nulls().cloned(), + ) + .context(ArrowComputeSnafu)?, + ) + } + _ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?, + }; + aligned.push(array); + } + i += 1; + j += 1; + } + Ordering::Less => { + aligned.push(new_null_array(expect_field.data_type(), struct_array.len())); + i += 1; + } + Ordering::Greater => { + return AlignJsonArraySnafu { + reason: format!("extra fields are found: [{}]", array_field.name()), + } + .fail(); + } + } + } + if i < expect_fields.len() { + for field in &expect_fields[i..] { + aligned.push(new_null_array(field.data_type(), struct_array.len())); + } + } + if j < array_fields.len() { + return AlignJsonArraySnafu { + reason: format!( + "extra fields are found: [{}]", + array_fields[j..] + .iter() + .map(|x| x.name().as_str()) + .collect::>() + .join(", ") + ), + } + .fail(); + } + + let json_array = StructArray::try_new( + expect_fields.clone(), + aligned, + struct_array.nulls().cloned(), + ) + .map_err(|e| { + AlignJsonArraySnafu { + reason: e.to_string(), + } + .build() + })?; + Ok(Arc::new(json_array)) + } + + fn try_cast(&self, to_type: &DataType) -> Result { + if compute::can_cast_types(self.inner.data_type(), to_type) { + return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu); + } + + let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default()) + .context(ArrowComputeSnafu)?; + let values = (0..self.inner.len()) + .map(|i| { + self.inner + .is_valid(i) + .then(|| formatter.value(i).to_string()) + }) + .collect::>(); + Ok(Arc::new(StringArray::from(values))) + } +} + +impl<'a> From<&'a ArrayRef> for JsonArray<'a> { + fn from(inner: &'a ArrayRef) -> Self { + Self { inner } + } +} + +#[cfg(test)] +mod test { + use arrow_array::types::Int64Type; + use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray}; + use arrow_schema::{Field, Fields}; + + use super::*; + + #[test] + fn test_align_json_array() -> Result<()> { + struct TestCase { + json_array: ArrayRef, + schema_type: DataType, + expected: std::result::Result, + } + + impl TestCase { + fn new( + json_array: StructArray, + schema_type: Fields, + expected: std::result::Result, String>, + ) -> Self { + Self { + json_array: Arc::new(json_array), + schema_type: DataType::Struct(schema_type.clone()), + expected: expected + .map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef), + } + } + + fn test(self) -> Result<()> { + let result = JsonArray::from(&self.json_array).try_align(&self.schema_type); + match (result, self.expected) { + (Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected), + (Ok(json_array), Err(e)) => { + panic!("expecting error {e} but actually get: {json_array:?}") + } + (Err(e), Err(expected)) => assert_eq!(e.to_string(), expected), + (Err(e), Ok(_)) => return Err(e), + } + Ok(()) + } + } + + // Test empty json array can be aligned with a complex json type. + TestCase::new( + StructArray::new_empty_fields(2, None), + Fields::from(vec![ + Field::new("int", DataType::Int64, true), + Field::new_struct( + "nested", + vec![Field::new("bool", DataType::Boolean, true)], + true, + ), + Field::new("string", DataType::Utf8, true), + ]), + Ok(vec![ + Arc::new(Int64Array::new_null(2)) as ArrayRef, + Arc::new(StructArray::new_null( + Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]), + 2, + )), + Arc::new(StringArray::new_null(2)), + ]), + ) + .test()?; + + // Test simple json array alignment. + TestCase::new( + StructArray::from(vec![( + Arc::new(Field::new("float", DataType::Float64, true)), + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + )]), + Fields::from(vec![ + Field::new("float", DataType::Float64, true), + Field::new("string", DataType::Utf8, true), + ]), + Ok(vec![ + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef, + Arc::new(StringArray::new_null(3)), + ]), + ) + .test()?; + + // Test complex json array alignment. + TestCase::new( + StructArray::from(vec![ + ( + Arc::new(Field::new_list( + "list", + Field::new_list_field(DataType::Int64, true), + true, + )), + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1)]), + None, + Some(vec![Some(2), Some(3)]), + ])) as ArrayRef, + ), + ( + Arc::new(Field::new_struct( + "nested", + vec![Field::new("int", DataType::Int64, true)], + true, + )), + Arc::new(StructArray::from(vec![( + Arc::new(Field::new("int", DataType::Int64, true)), + Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef, + )])), + ), + ( + Arc::new(Field::new("string", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ), + ]), + Fields::from(vec![ + Field::new("bool", DataType::Boolean, true), + Field::new_list("list", Field::new_list_field(DataType::Int64, true), true), + Field::new_struct( + "nested", + vec![ + Field::new("float", DataType::Float64, true), + Field::new("int", DataType::Int64, true), + ], + true, + ), + Field::new("string", DataType::Utf8, true), + ]), + Ok(vec![ + Arc::new(BooleanArray::new_null(3)) as ArrayRef, + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1)]), + None, + Some(vec![Some(2), Some(3)]), + ])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("float", DataType::Float64, true)), + Arc::new(Float64Array::new_null(3)) as ArrayRef, + ), + ( + Arc::new(Field::new("int", DataType::Int64, true)), + Arc::new(Int64Array::from(vec![-1, -2, -3])), + ), + ])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ]), + ) + .test()?; + + // Test align failed. + TestCase::new( + StructArray::try_from(vec![ + ("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef), + ("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef), + ]) + .unwrap(), + Fields::from(vec![Field::new("i", DataType::Int64, true)]), + Err("Failed to align JSON array, reason: extra fields are found: [j]".to_string()), + ) + .test()?; + Ok(()) + } +} diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index cf81cb9b3a..2b817dcb3a 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -39,8 +39,11 @@ use datatypes::arrow::datatypes::{ use datatypes::data_type::DataType; use datatypes::extension::json::is_json_extension_type; use datatypes::prelude::{MutableVector, Vector}; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::JsonType; use datatypes::value::ValueRef; use datatypes::vectors::Helper; +use datatypes::vectors::json::array::JsonArray; use mito_codec::key_values::{KeyValue, KeyValues}; use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields}; use parquet::arrow::ArrowWriter; @@ -55,9 +58,9 @@ use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange}; use crate::error::{ - self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu, - EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu, - NewRecordBatchSnafu, Result, + self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, + DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, + InvalidRequestSnafu, NewRecordBatchSnafu, Result, }; use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef}; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; @@ -432,11 +435,13 @@ impl UnorderedPart { // Get the schema from the first part let schema = self.parts[0].batch.schema(); - - // Concatenate all record batches - let batches: Vec = self.parts.iter().map(|p| p.batch.clone()).collect(); - let concatenated = - arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?; + let concatenated = if schema.has_json_extension_field() { + let (schema, batches) = align_parts(&self.parts)?; + arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)? + } else { + arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch)) + .context(ComputeArrowSnafu)? + }; // Sort the concatenated batch let sorted_batch = sort_primary_key_record_batch(&concatenated)?; @@ -473,6 +478,73 @@ impl UnorderedPart { } } +/// Align the JSON columns in [BulkPart]s, to unified Arrow arrays. So that we can compute (concat, +/// sort, etc.) on them. +fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec)> { + debug_assert!( + !parts.is_empty() + && parts + .windows(2) + .all(|w| w[0].batch.schema_ref().fields().len() + == w[1].batch.schema_ref().fields().len()) + ); + + let first = &parts[0]; + let base_schema = first.batch.schema_ref(); + let rest = &parts[1..]; + + let mut merged_types = HashMap::new(); + let mut aligned_fields = Vec::with_capacity(base_schema.fields().len()); + for (i, field) in base_schema.fields().iter().enumerate() { + if is_json_extension_type(field) { + let mut merged = JsonType::from(field.data_type()); + rest.iter() + .try_fold(&mut merged, |acc, x| { + acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?; + Ok(acc) + }) + .context(DataTypeMismatchSnafu)?; + merged_types.insert(i, merged.as_arrow_type()); + + aligned_fields.push(Arc::new( + Field::new( + field.name().clone(), + merged.as_arrow_type(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + )); + } else { + aligned_fields.push(field.clone()) + }; + } + let aligned_schema = Arc::new(Schema::new_with_metadata( + aligned_fields, + base_schema.metadata().clone(), + )); + + let mut aligned_batches = Vec::with_capacity(parts.len()); + for part in parts { + let mut columns = Vec::with_capacity(part.batch.num_columns()); + for (i, column) in part.batch.columns().iter().enumerate() { + if let Some(expect) = merged_types.get(&i) { + columns.push( + JsonArray::from(column) + .try_align(expect) + .context(ConvertValueSnafu)?, + ); + } else { + columns.push(column.clone()); + } + } + aligned_batches.push( + RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?, + ); + } + + Ok((aligned_schema, aligned_batches)) +} + /// More accurate estimation of the size of a record batch. pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize { batch diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 679c1d72e7..4577d7fe4e 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -18,7 +18,6 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; -use common_recordbatch::recordbatch::align_json_array; use datatypes::arrow::array::{ Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array, }; @@ -29,6 +28,7 @@ use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; use datatypes::value::Value; use datatypes::vectors::VectorRef; +use datatypes::vectors::json::array::JsonArray; use mito_codec::row_converter::{ CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec, build_primary_key_codec_with_fields, @@ -39,8 +39,8 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{ - CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, - NewRecordBatchSnafu, RecordBatchSnafu, Result, UnsupportedOperationSnafu, + CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu, + EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu, }; use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns}; use crate::sst::parquet::flat_format::primary_key_column_index; @@ -240,9 +240,12 @@ impl FlatCompatBatch { let old_column = batch.column(*pos); if let Some(ty) = cast_type { - let casted = if let Some(json_type) = ty.as_json() { - align_json_array(old_column, &json_type.as_arrow_type()) - .context(RecordBatchSnafu)? + let casted = if let Some(json_type) = ty.as_json() + && json_type.is_json2() + { + JsonArray::from(old_column) + .try_align(&json_type.as_arrow_type()) + .context(ConvertValueSnafu)? } else { datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type()) .context(ComputeArrowSnafu)? diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index f4c2ea3eca..3cba60d1dc 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -87,6 +87,7 @@ impl FlatWriteFormat { } /// Gets the arrow schema to store in parquet. + #[cfg(test)] pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema } @@ -103,7 +104,7 @@ impl FlatWriteFormat { let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); columns[sequence_column_index(batch.num_columns())] = sequence_array; - RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) + RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu) } } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index a8679fc036..a5c5fa68ab 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -95,6 +95,7 @@ impl PrimaryKeyWriteFormat { } /// Gets the arrow schema to store in parquet. + #[cfg(test)] pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 4e75073e26..13005ff9fc 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -72,13 +72,6 @@ enum FlatBatchConverter { } impl FlatBatchConverter { - fn arrow_schema(&self) -> &SchemaRef { - match self { - FlatBatchConverter::Flat(f) => f.arrow_schema(), - FlatBatchConverter::PrimaryKey { format, .. } => format.arrow_schema(), - } - } - fn convert_batch(&self, batch: &RecordBatch) -> Result { match self { FlatBatchConverter::Flat(f) => f.convert_batch(batch), @@ -406,7 +399,7 @@ where let arrow_batch = converter.convert_batch(&record_batch)?; let start = Instant::now(); - self.maybe_init_writer(converter.arrow_schema(), opts) + self.maybe_init_writer(arrow_batch.schema_ref(), opts) .await? .write(&arrow_batch) .await diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 4c37463978..f51735b769 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -163,7 +163,7 @@ pub fn column_to_schema( } else { false }; - if is_json2_column || matches!(column.data_type(), SqlDataType::JSON) { + if is_json2_column { let settings = column .extensions .build_json_structure_settings()? diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 5879ae9dfa..3be45d8970 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -2834,36 +2834,9 @@ async fn test_copy_parquet_map_to_binary(instance: Arc) { async fn test_create_table_with_json_datatype(instance: Arc) { let instance = instance.frontend(); - let sql = r#" -CREATE TABLE a ( - j JSON(format = "partial", unstructured_keys = ["foo", "foo.bar"]), - ts TIMESTAMP TIME INDEX, -)"#; - let output = execute_sql(&instance, sql).await.data; - assert!(matches!(output, OutputData::AffectedRows(0))); - - // "show create table" finds the information from table metadata. - // So if the output is expected, we know the options are really set. - let output = execute_sql(&instance, "SHOW CREATE TABLE a").await.data; - let expected = r#" -+-------+------------------------------------------------------------------------------+ -| Table | Create Table | -+-------+------------------------------------------------------------------------------+ -| a | CREATE TABLE IF NOT EXISTS "a" ( | -| | "j" JSON(format = 'partial', unstructured_keys = ['foo', 'foo.bar']) NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------+------------------------------------------------------------------------------+"#; - check_output_stream(output, expected).await; - - // test the default options let sql = r#" CREATE TABLE b ( - j JSON, + j JSON2, ts TIMESTAMP TIME INDEX, )"#; let output = execute_sql(&instance, sql).await.data; diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index d1b6a52ba1..e13e2307e1 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -14,6 +14,14 @@ values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'), Affected Rows: 2 +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + insert into json2_table (ts, j) values (3, '{"a": {"b": 3}, "c": "s3"}'); @@ -26,12 +34,28 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'), Affected Rows: 3 +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); Affected Rows: 2 +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + insert into json2_table values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'), (10, '{"a": {"b": 10}, "y": false}'); diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 243b29a319..3644f06be4 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -10,6 +10,8 @@ insert into json2_table (ts, j) values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'), (2, '{"a": {"b": -2}, "c": "s2", "d": [{"e": {"f": 0.2}}]}'); +admin flush_table('json2_table'); + insert into json2_table (ts, j) values (3, '{"a": {"b": 3}, "c": "s3"}'); @@ -18,10 +20,14 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'), (5, '{"a": {}, "c": "s5"}'), (6, '{"c": "s6"}'); +admin flush_table('json2_table'); + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); +admin flush_table('json2_table'); + insert into json2_table values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'), (10, '{"a": {"b": 10}, "y": false}');