feat: json2 flush

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-21 18:14:36 +08:00
parent 449243a175
commit 2fc6c2da7b
18 changed files with 522 additions and 306 deletions

View File

@@ -188,13 +188,6 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
AlignJsonArray {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -210,8 +203,7 @@ impl ErrorExt for Error {
| Error::ToArrowScalar { .. }
| Error::ProjectArrowRecordBatch { .. }
| Error::PhysicalExpr { .. }
| Error::RecordBatchSliceIndexOverflow { .. }
| Error::AlignJsonArray { .. } => StatusCode::Internal,
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,

View File

@@ -20,10 +20,11 @@ use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions, StructArray, new_null_array};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::vectors::json::array::JsonArray;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
@@ -31,8 +32,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use crate::DfRecordBatch;
use crate::error::{
self, AlignJsonArraySnafu, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu,
NewDfRecordBatchSnafu, ProjectArrowRecordBatchSnafu, Result,
self, ArrowComputeSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu,
Result,
};
/// A two-dimensional batch of column-oriented data with a defined schema.
@@ -354,80 +355,6 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
/// Align a json array `json_array` to the json type `schema_type`. The `schema_type` is often the
/// "largest" json type after some insertions in the table schema, while the json array previously
/// written in the SST could be lagged behind it. So it's important to "amend" the json array's
/// missing fields with null arrays, to align the array's data type with the provided one.
///
/// # Panics
///
/// - The json array is not an Arrow [StructArray], or the provided data type `schema_type` is not
/// of Struct type. Both of which shouldn't happen unless we switch our implementation of how
/// json array is physically stored.
pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> Result<ArrayRef> {
let json_type = json_array.data_type();
if json_type == schema_type {
return Ok(json_array.clone());
}
let json_array = json_array.as_struct();
let array_fields = json_array.fields();
let array_columns = json_array.columns();
let ArrowDataType::Struct(schema_fields) = schema_type else {
unreachable!()
};
let mut aligned = Vec::with_capacity(schema_fields.len());
// Compare the fields in the json array and the to-be-aligned schema, amending with null arrays
// on the way. It's very important to note that fields in the json array and in the json type
// are both SORTED.
let mut i = 0; // point to the schema fields
let mut j = 0; // point to the array fields
while i < schema_fields.len() && j < array_fields.len() {
let schema_field = &schema_fields[i];
let array_field = &array_fields[j];
if schema_field.name() == array_field.name() {
if matches!(schema_field.data_type(), ArrowDataType::Struct(_)) {
// A `StructArray`s in a json array must be another json array. (Like a nested json
// object in a json value.)
aligned.push(align_json_array(
&array_columns[j],
schema_field.data_type(),
)?);
} else {
aligned.push(array_columns[j].clone());
}
j += 1;
} else {
aligned.push(new_null_array(schema_field.data_type(), json_array.len()));
}
i += 1;
}
if i < schema_fields.len() {
for field in &schema_fields[i..] {
aligned.push(new_null_array(field.data_type(), json_array.len()));
}
}
ensure!(
j == array_fields.len(),
AlignJsonArraySnafu {
reason: format!(
"this json array has more fields {:?}",
array_fields[j..]
.iter()
.map(|x| x.name())
.collect::<Vec<_>>(),
)
}
);
let json_array =
StructArray::try_new(schema_fields.clone(), aligned, json_array.nulls().cloned())
.context(NewDfRecordBatchSnafu)?;
Ok(Arc::new(json_array))
}
fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
@@ -443,7 +370,9 @@ fn maybe_align_json_array_with_schema(
continue;
}
let json_array = align_json_array(&array, field.data_type())?;
let json_array = JsonArray::from(&array)
.try_align(field.data_type())
.context(DataTypesSnafu)?;
aligned.push(json_array);
}
Ok(aligned)
@@ -453,12 +382,8 @@ fn maybe_align_json_array_with_schema(
mod tests {
use std::sync::Arc;
use datatypes::arrow::array::{
AsArray, BooleanArray, Float64Array, Int64Array, ListArray, UInt32Array,
};
use datatypes::arrow::datatypes::{
DataType, Field, Fields, Int64Type, Schema as ArrowSchema, UInt32Type,
};
use datatypes::arrow::array::{AsArray, UInt32Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
@@ -466,165 +391,6 @@ mod tests {
use super::*;
#[test]
fn test_align_json_array() -> Result<()> {
struct TestCase {
json_array: ArrayRef,
schema_type: DataType,
expected: std::result::Result<ArrayRef, String>,
}
impl TestCase {
fn new(
json_array: StructArray,
schema_type: Fields,
expected: std::result::Result<Vec<ArrayRef>, String>,
) -> Self {
Self {
json_array: Arc::new(json_array),
schema_type: DataType::Struct(schema_type.clone()),
expected: expected
.map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
}
}
fn test(self) -> Result<()> {
let result = align_json_array(&self.json_array, &self.schema_type);
match (result, self.expected) {
(Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
(Ok(json_array), Err(e)) => {
panic!("expecting error {e} but actually get: {json_array:?}")
}
(Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
(Err(e), Ok(_)) => return Err(e),
}
Ok(())
}
}
// Test empty json array can be aligned with a complex json type.
TestCase::new(
StructArray::new_empty_fields(2, None),
Fields::from(vec![
Field::new("int", DataType::Int64, true),
Field::new_struct(
"nested",
vec![Field::new("bool", DataType::Boolean, true)],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Int64Array::new_null(2)) as ArrayRef,
Arc::new(StructArray::new_null(
Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
2,
)),
Arc::new(StringArray::new_null(2)),
]),
)
.test()?;
// Test simple json array alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
)]),
Fields::from(vec![
Field::new("float", DataType::Float64, true),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
Arc::new(StringArray::new_null(3)),
]),
)
.test()?;
// Test complex json array alignment.
TestCase::new(
StructArray::from(vec![
(
Arc::new(Field::new_list(
"list",
Field::new_list_field(DataType::Int64, true),
true,
)),
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])) as ArrayRef,
),
(
Arc::new(Field::new_struct(
"nested",
vec![Field::new("int", DataType::Int64, true)],
true,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
)])),
),
(
Arc::new(Field::new("string", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
),
]),
Fields::from(vec![
Field::new("bool", DataType::Boolean, true),
Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
Field::new_struct(
"nested",
vec![
Field::new("float", DataType::Float64, true),
Field::new("int", DataType::Int64, true),
],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(BooleanArray::new_null(3)) as ArrayRef,
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::new_null(3)) as ArrayRef,
),
(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])),
),
])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
]),
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
])
.unwrap(),
Fields::from(vec![Field::new("i", DataType::Int64, true)]),
Err(
r#"Failed to align JSON array, reason: this json array has more fields ["j"]"#
.to_string(),
),
)
.test()?;
Ok(())
}
#[test]
fn test_record_batch() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![

View File

@@ -281,6 +281,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
AlignJsonArray {
reason: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -324,7 +331,8 @@ impl ErrorExt for Error {
| ParseExtendedType { .. }
| InconsistentStructFieldsAndItems { .. }
| ArrowMetadata { .. }
| AlignJsonValue { .. } => StatusCode::Internal,
| AlignJsonValue { .. }
| AlignJsonArray { .. } => StatusCode::Internal,
}
}

View File

@@ -14,6 +14,7 @@
mod column_schema;
pub mod constraint;
pub mod ext;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -0,0 +1,27 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::extension::json;
/// Add some useful utilities upon Arrow's [Schema](arrow_schema::Schema).
pub trait ArrowSchemaExt {
/// Check if this [Schema](arrow_schema::Schema) has JSON extension field.
fn has_json_extension_field(&self) -> bool;
}
impl ArrowSchemaExt for arrow_schema::Schema {
fn has_json_extension_field(&self) -> bool {
self.fields().iter().any(json::is_json_extension_type)
}
}

View File

@@ -317,6 +317,12 @@ fn merge(this: &JsonNativeType, that: &JsonNativeType) -> JsonNativeType {
}
}
impl From<&ArrowDataType> for JsonType {
fn from(t: &ArrowDataType) -> Self {
JsonType::new_json2(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t)))
}
}
impl DataType for JsonType {
fn name(&self) -> String {
match &self.format {

View File

@@ -35,7 +35,7 @@ mod duration;
mod eq;
mod helper;
mod interval;
pub(crate) mod json;
pub mod json;
mod list;
mod null;
pub(crate) mod operations;

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod array;
pub(crate) mod builder;

View File

@@ -0,0 +1,342 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::sync::Arc;
use arrow::compute;
use arrow::util::display::{ArrayFormatter, FormatOptions};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, GenericListArray, StructArray, new_null_array};
use arrow_schema::DataType;
use snafu::{OptionExt, ResultExt};
use crate::arrow_array::StringArray;
use crate::error::{AlignJsonArraySnafu, ArrowComputeSnafu, Result};
pub struct JsonArray<'a> {
inner: &'a ArrayRef,
}
impl JsonArray<'_> {
/// Align a JSON array to the `expect` data type. The `expect` data type is often the "largest"
/// JSON type after some insertions in the table schema, while the JSON array previously
/// written in the SST could be lagged behind it. So it's important to "align" the JSON array by
/// setting the missing fields with null arrays, or casting the data.
///
/// It's an error if the to-be-aligned array contains extra fields that are not in the `expect`
/// data type. Forcing to align that kind of array will result in data loss, something we
/// generally not wanted.
pub fn try_align(&self, expect: &DataType) -> Result<ArrayRef> {
if self.inner.data_type() == expect {
return Ok(self.inner.clone());
}
let struct_array = self.inner.as_struct_opt().context(AlignJsonArraySnafu {
reason: "expect struct array",
})?;
let array_fields = struct_array.fields();
let array_columns = struct_array.columns();
let DataType::Struct(expect_fields) = expect else {
return AlignJsonArraySnafu {
reason: "expect struct datatype",
}
.fail();
};
let mut aligned = Vec::with_capacity(expect_fields.len());
// Compare the fields in the JSON array and the to-be-aligned schema, amending with null
// arrays on the way. It's very important to note that fields in the JSON array and those
// in the JSON type are both **SORTED**, which can be guaranteed because the fields in the
// JSON type implementation are sorted.
debug_assert!(expect_fields.iter().map(|f| f.name()).is_sorted());
debug_assert!(array_fields.iter().map(|f| f.name()).is_sorted());
let mut i = 0; // point to the expect fields
let mut j = 0; // point to the array fields
while i < expect_fields.len() && j < array_fields.len() {
let expect_field = &expect_fields[i];
let array_field = &array_fields[j];
match expect_field.name().cmp(array_field.name()) {
Ordering::Equal => {
if expect_field.data_type() == array_field.data_type() {
aligned.push(array_columns[j].clone());
} else {
let expect_type = expect_field.data_type();
let array_type = array_field.data_type();
let array = match (expect_type, array_type) {
(DataType::Struct(_), DataType::Struct(_)) => {
JsonArray::from(&array_columns[j]).try_align(expect_type)?
}
(DataType::List(expect_item), DataType::List(array_item)) => {
let list_array = array_columns[j].as_list::<i32>();
let item_aligned =
match (expect_item.data_type(), array_item.data_type()) {
(DataType::Struct(_), DataType::Struct(_)) => {
JsonArray::from(list_array.values())
.try_align(expect_item.data_type())?
}
_ => JsonArray::from(list_array.values())
.try_cast(expect_item.data_type())?,
};
Arc::new(
GenericListArray::<i32>::try_new(
expect_item.clone(),
list_array.offsets().clone(),
item_aligned,
list_array.nulls().cloned(),
)
.context(ArrowComputeSnafu)?,
)
}
_ => JsonArray::from(&array_columns[j]).try_cast(expect_type)?,
};
aligned.push(array);
}
i += 1;
j += 1;
}
Ordering::Less => {
aligned.push(new_null_array(expect_field.data_type(), struct_array.len()));
i += 1;
}
Ordering::Greater => {
return AlignJsonArraySnafu {
reason: format!("extra fields are found: [{}]", array_field.name()),
}
.fail();
}
}
}
if i < expect_fields.len() {
for field in &expect_fields[i..] {
aligned.push(new_null_array(field.data_type(), struct_array.len()));
}
}
if j < array_fields.len() {
return AlignJsonArraySnafu {
reason: format!(
"extra fields are found: [{}]",
array_fields[j..]
.iter()
.map(|x| x.name().as_str())
.collect::<Vec<_>>()
.join(", ")
),
}
.fail();
}
let json_array = StructArray::try_new(
expect_fields.clone(),
aligned,
struct_array.nulls().cloned(),
)
.map_err(|e| {
AlignJsonArraySnafu {
reason: e.to_string(),
}
.build()
})?;
Ok(Arc::new(json_array))
}
fn try_cast(&self, to_type: &DataType) -> Result<ArrayRef> {
if compute::can_cast_types(self.inner.data_type(), to_type) {
return compute::cast(&self.inner, to_type).context(ArrowComputeSnafu);
}
let formatter = ArrayFormatter::try_new(&self.inner, &FormatOptions::default())
.context(ArrowComputeSnafu)?;
let values = (0..self.inner.len())
.map(|i| {
self.inner
.is_valid(i)
.then(|| formatter.value(i).to_string())
})
.collect::<Vec<_>>();
Ok(Arc::new(StringArray::from(values)))
}
}
impl<'a> From<&'a ArrayRef> for JsonArray<'a> {
fn from(inner: &'a ArrayRef) -> Self {
Self { inner }
}
}
#[cfg(test)]
mod test {
use arrow_array::types::Int64Type;
use arrow_array::{BooleanArray, Float64Array, Int64Array, ListArray};
use arrow_schema::{Field, Fields};
use super::*;
#[test]
fn test_align_json_array() -> Result<()> {
struct TestCase {
json_array: ArrayRef,
schema_type: DataType,
expected: std::result::Result<ArrayRef, String>,
}
impl TestCase {
fn new(
json_array: StructArray,
schema_type: Fields,
expected: std::result::Result<Vec<ArrayRef>, String>,
) -> Self {
Self {
json_array: Arc::new(json_array),
schema_type: DataType::Struct(schema_type.clone()),
expected: expected
.map(|x| Arc::new(StructArray::new(schema_type, x, None)) as ArrayRef),
}
}
fn test(self) -> Result<()> {
let result = JsonArray::from(&self.json_array).try_align(&self.schema_type);
match (result, self.expected) {
(Ok(json_array), Ok(expected)) => assert_eq!(&json_array, &expected),
(Ok(json_array), Err(e)) => {
panic!("expecting error {e} but actually get: {json_array:?}")
}
(Err(e), Err(expected)) => assert_eq!(e.to_string(), expected),
(Err(e), Ok(_)) => return Err(e),
}
Ok(())
}
}
// Test empty json array can be aligned with a complex json type.
TestCase::new(
StructArray::new_empty_fields(2, None),
Fields::from(vec![
Field::new("int", DataType::Int64, true),
Field::new_struct(
"nested",
vec![Field::new("bool", DataType::Boolean, true)],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Int64Array::new_null(2)) as ArrayRef,
Arc::new(StructArray::new_null(
Fields::from(vec![Arc::new(Field::new("bool", DataType::Boolean, true))]),
2,
)),
Arc::new(StringArray::new_null(2)),
]),
)
.test()?;
// Test simple json array alignment.
TestCase::new(
StructArray::from(vec![(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
)]),
Fields::from(vec![
Field::new("float", DataType::Float64, true),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])) as ArrayRef,
Arc::new(StringArray::new_null(3)),
]),
)
.test()?;
// Test complex json array alignment.
TestCase::new(
StructArray::from(vec![
(
Arc::new(Field::new_list(
"list",
Field::new_list_field(DataType::Int64, true),
true,
)),
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])) as ArrayRef,
),
(
Arc::new(Field::new_struct(
"nested",
vec![Field::new("int", DataType::Int64, true)],
true,
)),
Arc::new(StructArray::from(vec![(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])) as ArrayRef,
)])),
),
(
Arc::new(Field::new("string", DataType::Utf8, true)),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
),
]),
Fields::from(vec![
Field::new("bool", DataType::Boolean, true),
Field::new_list("list", Field::new_list_field(DataType::Int64, true), true),
Field::new_struct(
"nested",
vec![
Field::new("float", DataType::Float64, true),
Field::new("int", DataType::Int64, true),
],
true,
),
Field::new("string", DataType::Utf8, true),
]),
Ok(vec![
Arc::new(BooleanArray::new_null(3)) as ArrayRef,
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
Some(vec![Some(1)]),
None,
Some(vec![Some(2), Some(3)]),
])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("float", DataType::Float64, true)),
Arc::new(Float64Array::new_null(3)) as ArrayRef,
),
(
Arc::new(Field::new("int", DataType::Int64, true)),
Arc::new(Int64Array::from(vec![-1, -2, -3])),
),
])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
]),
)
.test()?;
// Test align failed.
TestCase::new(
StructArray::try_from(vec![
("i", Arc::new(Int64Array::from(vec![1])) as ArrayRef),
("j", Arc::new(Int64Array::from(vec![2])) as ArrayRef),
])
.unwrap(),
Fields::from(vec![Field::new("i", DataType::Int64, true)]),
Err("Failed to align JSON array, reason: extra fields are found: [j]".to_string()),
)
.test()?;
Ok(())
}
}

View File

@@ -39,8 +39,11 @@ use datatypes::arrow::datatypes::{
use datatypes::data_type::DataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::{MutableVector, Vector};
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::JsonType;
use datatypes::value::ValueRef;
use datatypes::vectors::Helper;
use datatypes::vectors::json::array::JsonArray;
use mito_codec::key_values::{KeyValue, KeyValues};
use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
use parquet::arrow::ArrowWriter;
@@ -55,9 +58,9 @@ use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
use crate::error::{
self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
NewRecordBatchSnafu, Result,
self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
InvalidRequestSnafu, NewRecordBatchSnafu, Result,
};
use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
@@ -432,11 +435,13 @@ impl UnorderedPart {
// Get the schema from the first part
let schema = self.parts[0].batch.schema();
// Concatenate all record batches
let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
let concatenated =
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
let concatenated = if schema.has_json_extension_field() {
let (schema, batches) = align_parts(&self.parts)?;
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
} else {
arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
.context(ComputeArrowSnafu)?
};
// Sort the concatenated batch
let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
@@ -473,6 +478,73 @@ impl UnorderedPart {
}
}
/// Align the JSON columns in [BulkPart]s, to unified Arrow arrays. So that we can compute (concat,
/// sort, etc.) on them.
fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
debug_assert!(
!parts.is_empty()
&& parts
.windows(2)
.all(|w| w[0].batch.schema_ref().fields().len()
== w[1].batch.schema_ref().fields().len())
);
let first = &parts[0];
let base_schema = first.batch.schema_ref();
let rest = &parts[1..];
let mut merged_types = HashMap::new();
let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
for (i, field) in base_schema.fields().iter().enumerate() {
if is_json_extension_type(field) {
let mut merged = JsonType::from(field.data_type());
rest.iter()
.try_fold(&mut merged, |acc, x| {
acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?;
Ok(acc)
})
.context(DataTypeMismatchSnafu)?;
merged_types.insert(i, merged.as_arrow_type());
aligned_fields.push(Arc::new(
Field::new(
field.name().clone(),
merged.as_arrow_type(),
field.is_nullable(),
)
.with_metadata(field.metadata().clone()),
));
} else {
aligned_fields.push(field.clone())
};
}
let aligned_schema = Arc::new(Schema::new_with_metadata(
aligned_fields,
base_schema.metadata().clone(),
));
let mut aligned_batches = Vec::with_capacity(parts.len());
for part in parts {
let mut columns = Vec::with_capacity(part.batch.num_columns());
for (i, column) in part.batch.columns().iter().enumerate() {
if let Some(expect) = merged_types.get(&i) {
columns.push(
JsonArray::from(column)
.try_align(expect)
.context(ConvertValueSnafu)?,
);
} else {
columns.push(column.clone());
}
}
aligned_batches.push(
RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?,
);
}
Ok((aligned_schema, aligned_batches))
}
/// More accurate estimation of the size of a record batch.
pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
batch

View File

@@ -18,7 +18,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use common_recordbatch::recordbatch::align_json_array;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
};
@@ -29,6 +28,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use datatypes::vectors::json::array::JsonArray;
use mito_codec::row_converter::{
CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
build_primary_key_codec_with_fields,
@@ -39,8 +39,8 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{
CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
NewRecordBatchSnafu, RecordBatchSnafu, Result, UnsupportedOperationSnafu,
CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
};
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
use crate::sst::parquet::flat_format::primary_key_column_index;
@@ -240,9 +240,12 @@ impl FlatCompatBatch {
let old_column = batch.column(*pos);
if let Some(ty) = cast_type {
let casted = if let Some(json_type) = ty.as_json() {
align_json_array(old_column, &json_type.as_arrow_type())
.context(RecordBatchSnafu)?
let casted = if let Some(json_type) = ty.as_json()
&& json_type.is_json2()
{
JsonArray::from(old_column)
.try_align(&json_type.as_arrow_type())
.context(ConvertValueSnafu)?
} else {
datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
.context(ComputeArrowSnafu)?

View File

@@ -87,6 +87,7 @@ impl FlatWriteFormat {
}
/// Gets the arrow schema to store in parquet.
#[cfg(test)]
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}
@@ -103,7 +104,7 @@ impl FlatWriteFormat {
let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
columns[sequence_column_index(batch.num_columns())] = sequence_array;
RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
}
}

View File

@@ -95,6 +95,7 @@ impl PrimaryKeyWriteFormat {
}
/// Gets the arrow schema to store in parquet.
#[cfg(test)]
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}

View File

@@ -72,13 +72,6 @@ enum FlatBatchConverter {
}
impl FlatBatchConverter {
fn arrow_schema(&self) -> &SchemaRef {
match self {
FlatBatchConverter::Flat(f) => f.arrow_schema(),
FlatBatchConverter::PrimaryKey { format, .. } => format.arrow_schema(),
}
}
fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
match self {
FlatBatchConverter::Flat(f) => f.convert_batch(batch),
@@ -406,7 +399,7 @@ where
let arrow_batch = converter.convert_batch(&record_batch)?;
let start = Instant::now();
self.maybe_init_writer(converter.arrow_schema(), opts)
self.maybe_init_writer(arrow_batch.schema_ref(), opts)
.await?
.write(&arrow_batch)
.await

View File

@@ -163,7 +163,7 @@ pub fn column_to_schema(
} else {
false
};
if is_json2_column || matches!(column.data_type(), SqlDataType::JSON) {
if is_json2_column {
let settings = column
.extensions
.build_json_structure_settings()?

View File

@@ -2834,36 +2834,9 @@ async fn test_copy_parquet_map_to_binary(instance: Arc<dyn MockInstance>) {
async fn test_create_table_with_json_datatype(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let sql = r#"
CREATE TABLE a (
j JSON(format = "partial", unstructured_keys = ["foo", "foo.bar"]),
ts TIMESTAMP TIME INDEX,
)"#;
let output = execute_sql(&instance, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
// "show create table" finds the information from table metadata.
// So if the output is expected, we know the options are really set.
let output = execute_sql(&instance, "SHOW CREATE TABLE a").await.data;
let expected = r#"
+-------+------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------+
| a | CREATE TABLE IF NOT EXISTS "a" ( |
| | "j" JSON(format = 'partial', unstructured_keys = ['foo', 'foo.bar']) NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------+------------------------------------------------------------------------------+"#;
check_output_stream(output, expected).await;
// test the default options
let sql = r#"
CREATE TABLE b (
j JSON,
j JSON2,
ts TIMESTAMP TIME INDEX,
)"#;
let output = execute_sql(&instance, sql).await.data;

View File

@@ -14,6 +14,14 @@ values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'),
Affected Rows: 2
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
@@ -26,12 +34,28 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
Affected Rows: 3
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
Affected Rows: 2
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table
values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'),
(10, '{"a": {"b": 10}, "y": false}');

View File

@@ -10,6 +10,8 @@ insert into json2_table (ts, j)
values (1, '{"a": {"b": 1}, "c": "s1", "d": [{"e": {"f": 0.1}}]}'),
(2, '{"a": {"b": -2}, "c": "s2", "d": [{"e": {"f": 0.2}}]}');
admin flush_table('json2_table');
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
@@ -18,10 +20,14 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
(5, '{"a": {}, "c": "s5"}'),
(6, '{"c": "s6"}');
admin flush_table('json2_table');
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
admin flush_table('json2_table');
insert into json2_table
values (9, '{"a": {"x": true}, "c": "s9", "d": [{"e": {"g": -0.9}}]}'),
(10, '{"a": {"b": 10}, "y": false}');