From 58c85dd1ce0c715b5a8742e6cbf51dfd02815f12 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Thu, 18 Jun 2026 13:27:56 +0800 Subject: [PATCH] fix: guard structured JSON alignment paths against legacy JSONB columns (#8323) * Initial plan * fix: guard structured json alignment to fix Clippy CI failure - Add `is_structured_json_field` function that only returns true for fields with both JSON extension type AND Struct Arrow data type - Replace all usages of `is_json_extension_type` / `has_json_extension_field` with `is_structured_json_field` to prevent legacy JSONB binary columns from entering structured JSON alignment paths - Fix logic in `FlatProjectionMapper::new_with_read_columns` to guard JSON type hint concretization for JSON2 columns only - Fix `create_column` in show_create_table.rs to only emit JSON structure settings for JSON2 columns - Move `mod tests` to end of flat_projection.rs to fix clippy::items_after_test_module - Add tests for legacy JSON behavior * fix: guard narrow_read_columns_by_json_type_hint with is_structured_json_field check --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> --- src/common/recordbatch/src/recordbatch.rs | 22 +++++-- src/datatypes/src/extension/json.rs | 8 +++ src/mito2/src/compaction.rs | 7 +- src/mito2/src/memtable/bulk/part.rs | 11 ++-- src/mito2/src/read/compat.rs | 12 ++-- src/mito2/src/read/flat_projection.rs | 79 ++++++++++++++++++++--- src/mito2/src/read/scan_region.rs | 32 +++++---- src/mito2/src/sst/parquet/reader.rs | 9 ++- src/mito2/src/sst/parquet/writer.rs | 12 ++-- src/query/src/sql/show_create_table.rs | 68 ++++++++++++++++++- 10 files changed, 215 insertions(+), 45 deletions(-) diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4289714afd..7280772d05 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -21,7 +21,7 @@ use datafusion_common::arrow::array::ArrayRef; use datafusion_common::arrow::compute; use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions}; -use datatypes::extension::json::is_json_extension_type; +use datatypes::extension::json::is_structured_json_field; use datatypes::prelude::DataType; use datatypes::schema::SchemaRef; use datatypes::vectors::json::array::JsonArray; @@ -359,13 +359,13 @@ fn maybe_align_json_array_with_schema( schema: &ArrowSchemaRef, arrays: Vec, ) -> Result> { - if schema.fields().iter().all(|f| !is_json_extension_type(f)) { + if schema.fields().iter().all(|f| !is_structured_json_field(f)) { return Ok(arrays); } let mut aligned = Vec::with_capacity(arrays.len()); for (field, array) in schema.fields().iter().zip(arrays) { - if !is_json_extension_type(field) { + if !is_structured_json_field(field) { aligned.push(array); continue; } @@ -382,10 +382,11 @@ fn maybe_align_json_array_with_schema( mod tests { use std::sync::Arc; - use datatypes::arrow::array::{AsArray, UInt32Array}; + use datatypes::arrow::array::{AsArray, BinaryArray, UInt32Array}; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type}; use datatypes::arrow_array::StringArray; use datatypes::data_type::ConcreteDataType; + use datatypes::extension::json::{JsonExtensionType, JsonMetadata}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{StringVector, UInt32Vector}; @@ -507,4 +508,17 @@ mod tests { .expect("merge recordbatch"); assert_eq!(merged.num_rows(), 8); } + + #[test] + fn test_legacy_json_with_extension_does_not_align_as_structured_json() { + let field = Field::new("j", DataType::Binary, true) + .with_extension_type(JsonExtensionType::new(Arc::new(JsonMetadata::default()))); + let arrow_schema = Arc::new(ArrowSchema::new(vec![field])); + let schema = Arc::new(Schema::try_from(arrow_schema).unwrap()); + let arrays = + vec![Arc::new(BinaryArray::from(vec![Some(br#"{"a":1}"#.as_slice())])) as ArrayRef]; + + let aligned = maybe_align_json_array_with_schema(schema.arrow_schema(), arrays).unwrap(); + assert_eq!(aligned[0].data_type(), &DataType::Binary); + } } diff --git a/src/datatypes/src/extension/json.rs b/src/datatypes/src/extension/json.rs index abc75bb35b..678308226c 100644 --- a/src/datatypes/src/extension/json.rs +++ b/src/datatypes/src/extension/json.rs @@ -107,3 +107,11 @@ impl ExtensionType for JsonExtensionType { pub fn is_json_extension_type(field: &FieldRef) -> bool { field.extension_type_name() == Some(JsonExtensionType::NAME) } + +/// Check if this field is a structured JSON field. +/// +/// Legacy JSONB columns may carry JSON extension metadata due to old metadata versions, but their +/// physical Arrow type is still Binary. They must not enter structured JSON alignment paths. +pub fn is_structured_json_field(field: &FieldRef) -> bool { + is_json_extension_type(field) && matches!(field.data_type(), DataType::Struct(_)) +} diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index e743ef82ba..10900b5540 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -40,8 +40,7 @@ use common_time::timestamp::TimeUnit; use common_time::{TimeToLive, Timestamp}; use datafusion_common::ScalarValue; use datafusion_expr::Expr; -use datatypes::extension::json::is_json_extension_type; -use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::extension::json::is_structured_json_field; use datatypes::types::json_type::JsonNativeType; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::metadata::PageIndexPolicy; @@ -1040,11 +1039,11 @@ impl CompactionSstReaderBuilder<'_> { async fn build_scan_input(self) -> Result { let schema = self.metadata.schema.arrow_schema(); - let json_type_hint = if schema.has_json_extension_field() { + let json_type_hint = if schema.fields().iter().any(is_structured_json_field) { let mut json_type_hint = schema .fields() .iter() - .filter(|&field| is_json_extension_type(field)) + .filter(|&field| is_structured_json_field(field)) .map(|field| (field.name().clone(), JsonNativeType::Null)) .collect::>(); diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index e094b662f9..ed7b8addf1 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -37,9 +37,8 @@ use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type, }; use datatypes::data_type::DataType; -use datatypes::extension::json::is_json_extension_type; +use datatypes::extension::json::is_structured_json_field; use datatypes::prelude::{MutableVector, Vector}; -use datatypes::schema::ext::ArrowSchemaExt; use datatypes::types::JsonType; use datatypes::value::ValueRef; use datatypes::vectors::Helper; @@ -435,7 +434,7 @@ impl UnorderedPart { // Get the schema from the first part let schema = self.parts[0].batch.schema(); - let concatenated = if schema.has_json_extension_field() { + let concatenated = if schema.fields().iter().any(is_structured_json_field) { let (schema, batches) = align_parts(&self.parts)?; arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)? } else { @@ -496,7 +495,7 @@ fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec)> { let mut merged_types = HashMap::new(); let mut aligned_fields = Vec::with_capacity(base_schema.fields().len()); for (i, field) in base_schema.fields().iter().enumerate() { - if is_json_extension_type(field) { + if is_structured_json_field(field) { let mut merged = JsonType::from(field.data_type()); rest.iter() .try_fold(&mut merged, |acc, x| { @@ -775,13 +774,13 @@ impl BulkPartConverter { } fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef { - if schema.fields().iter().all(|f| !is_json_extension_type(f)) { + if schema.fields().iter().all(|f| !is_structured_json_field(f)) { return schema; } let mut fields = Vec::with_capacity(schema.fields().len()); for (field, array) in schema.fields().iter().zip(columns) { - if !is_json_extension_type(field) { + if !is_structured_json_field(field) { fields.push(field.clone()); continue; } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 54a7893a86..48f641b1ef 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -25,9 +25,8 @@ use datatypes::arrow::compute::{TakeOptions, take}; use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; -use datatypes::extension::json::is_json_extension_type; +use datatypes::extension::json::is_structured_json_field; use datatypes::prelude::DataType; -use datatypes::schema::ext::ArrowSchemaExt; use datatypes::value::Value; use datatypes::vectors::VectorRef; use datatypes::vectors::json::array::JsonArray; @@ -100,9 +99,14 @@ impl FlatCompatBatch { let actual = read_format.metadata(); let format_projection = read_format.format_projection(); let mut actual_schema = flat_projected_columns(actual, format_projection); - if read_format.arrow_schema().has_json_extension_field() { + if read_format + .arrow_schema() + .fields() + .iter() + .any(is_structured_json_field) + { for field in read_format.arrow_schema().fields() { - if is_json_extension_type(field) + if is_structured_json_field(field) && let Some(column_id) = actual.column_by_name(field.name()).map(|x| x.column_id) && let Some(i) = actual_schema.iter().position(|x| x.0 == column_id) diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 6dfbab4a0b..a87d329fa6 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -25,7 +25,7 @@ use common_recordbatch::error::{ use common_recordbatch::{DfRecordBatch, RecordBatch}; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; -use datatypes::extension::json::is_json_extension_type; +use datatypes::extension::json::is_structured_json_field; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::types::json_type::JsonNativeType; @@ -118,7 +118,10 @@ impl FlatProjectionMapper { .and_then(|x| x.get(&schema.name)) .cloned() .map(ConcreteDataType::json2) - && schema.data_type.is_json() + && schema + .data_type + .as_json() + .is_some_and(|json_type| json_type.is_json2()) { schema.data_type = concretized; } @@ -142,10 +145,13 @@ impl FlatProjectionMapper { && !json_type_hint.is_empty() { for (column_id, data_type) in batch_schema.iter_mut() { - if let Some(concretized) = metadata - .column_by_id(*column_id) - .and_then(|x| json_type_hint.get(&x.column_schema.name).cloned()) - .map(ConcreteDataType::json2) + if data_type + .as_json() + .is_some_and(|json_type| json_type.is_json2()) + && let Some(concretized) = metadata + .column_by_id(*column_id) + .and_then(|x| json_type_hint.get(&x.column_schema.name).cloned()) + .map(ConcreteDataType::json2) { *data_type = concretized; } @@ -260,7 +266,7 @@ impl FlatProjectionMapper { .input_arrow_schema .fields() .iter() - .filter(|&field| is_json_extension_type(field)) + .filter(|&field| is_structured_json_field(field)) .map(|field| (field.name().clone(), field.data_type().clone())) .collect(); to_flat_sst_arrow_schema(&self.metadata, &options) @@ -321,7 +327,7 @@ impl FlatProjectionMapper { } let field = &self.output_schema.arrow_schema().fields()[output_idx]; - if is_json_extension_type(field) { + if is_structured_json_field(field) { array = JsonArray::from(&array) .try_align(field.data_type()) .context(DataTypesSnafu)?; @@ -548,3 +554,60 @@ impl DfBatchAssembler { RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns) } } + +#[cfg(test)] +mod tests { + use datatypes::types::json_type::JsonObjectType; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + fn metadata_with_legacy_json() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "j", + ConcreteDataType::json_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }); + Arc::new(builder.build().unwrap()) + } + + #[test] + fn test_json_type_hint_does_not_concretize_legacy_json() { + let metadata = metadata_with_legacy_json(); + let hint = HashMap::from([( + "j".to_string(), + JsonNativeType::Object(JsonObjectType::from([( + "a".to_string(), + JsonNativeType::i64(), + )])), + )]); + let mapper = FlatProjectionMapper::new_with_read_columns( + &metadata, + vec![0, 1], + ReadColumns::from_deduped_column_ids([0, 1]), + Some(&hint), + ) + .unwrap(); + + assert_eq!( + mapper.batch_schema()[0], + (0, ConcreteDataType::json_datatype()) + ); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f9f2ad512f..fcbe077cc6 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -31,7 +31,7 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; use datafusion_common::Column; use datafusion_expr::Expr; use datafusion_expr::utils::expr_to_columns; -use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::extension::json::is_structured_json_field; use datatypes::types::json_type::JsonNativeType; use futures::StreamExt; use itertools::Itertools; @@ -425,11 +425,24 @@ impl ScanRegion { ReadColumns::from_deduped_column_ids(read_col_ids) } }; - narrow_read_columns_by_json_type_hint( - &mut read_cols, - &self.request.json_type_hint, - &self.version.metadata, - ); + // Only narrow read columns and pass JSON type hints for structured JSON (JSON2) + // columns. Legacy JSONB columns have JSON extension metadata but their physical + // Arrow type is Binary, not Struct, so they must not enter structured JSON paths. + let has_structured_json = self + .version + .metadata + .schema + .arrow_schema() + .fields() + .iter() + .any(is_structured_json_field); + if has_structured_json { + narrow_read_columns_by_json_type_hint( + &mut read_cols, + &self.request.json_type_hint, + &self.version.metadata, + ); + } let read_col_ids = read_cols.column_ids(); // The mapper always computes projected column ids as the schema of SSTs may change. @@ -438,12 +451,7 @@ impl ScanRegion { .projection_indices() .map(|x| x.to_vec()) .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect()); - let json_type_hint = self - .version - .metadata - .schema - .arrow_schema() - .has_json_extension_field() + let json_type_hint = has_structured_json .then_some(&self.request.json_type_hint) .inspect(|json_type_hint| { debug!( diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ff7479c876..e936bf8d8b 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -33,8 +33,8 @@ use datatypes::arrow::array::ArrayRef; use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; +use datatypes::extension::json::is_structured_json_field; use datatypes::prelude::DataType; -use datatypes::schema::ext::ArrowSchemaExt; use futures::StreamExt; use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; @@ -525,7 +525,12 @@ impl ParquetReaderBuilder { // Create ArrowReaderMetadata for async stream building. let mut arrow_reader_options = ArrowReaderOptions::new(); - if !read_format.arrow_schema().has_json_extension_field() { + if !read_format + .arrow_schema() + .fields() + .iter() + .any(is_structured_json_field) + { // Read `__primary_key` as Binary when it's too large for dictionary // encoding; convert_batch wraps it back to a DictionaryArray. let schema_for_reader = if should_read_pk_as_binary(&parquet_meta) { diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 5b7b1c310e..e97965d8e7 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -32,8 +32,7 @@ use datatypes::arrow::array::{ use datatypes::arrow::compute::{max, min}; use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::extension::json::is_json_extension_type; -use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::extension::json::is_structured_json_field; use object_store::{FuturesAsyncWriter, ObjectStore}; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; @@ -276,12 +275,17 @@ where ) -> Result { let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); - if source.schema().has_json_extension_field() { + if source + .schema() + .fields() + .iter() + .any(is_structured_json_field) + { options.concretized_json_types = source .schema() .fields() .iter() - .filter(|&field| is_json_extension_type(field)) + .filter(|&field| is_structured_json_field(field)) .map(|field| (field.name().clone(), field.data_type().clone())) .collect::>(); } diff --git a/src/query/src/sql/show_create_table.rs b/src/query/src/sql/show_create_table.rs index ee3049c9f7..a89607d789 100644 --- a/src/query/src/sql/show_create_table.rs +++ b/src/query/src/sql/show_create_table.rs @@ -197,7 +197,12 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result()? { + if column_schema + .data_type + .as_json() + .is_some_and(|json_type| json_type.is_json2()) + && let Some(json_extension) = column_schema.extension_type::()? + { let settings = json_extension .metadata() .json_structure_settings @@ -419,6 +424,67 @@ WITH( ); } + #[test] + fn test_show_create_legacy_json_with_json_extension() { + let mut json_column = ColumnSchema::new("j", ConcreteDataType::json_datatype(), true); + json_column + .with_extension_type(&JsonExtensionType::new(Arc::new( + datatypes::extension::json::JsonMetadata { + json_structure_settings: Some(datatypes::json::JsonStructureSettings::default()), + }, + ))) + .unwrap(); + + let table_schema = SchemaRef::new(Schema::new(vec![ + json_column, + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond), + false, + ) + .with_time_index(true), + ])); + let table_name = "legacy_json"; + let meta = TableMetaBuilder::empty() + .schema(table_schema) + .primary_key_indices(vec![]) + .value_indices(vec![0]) + .engine("mito".to_string()) + .next_column_id(0) + .options(Default::default()) + .created_on(Default::default()) + .build() + .unwrap(); + + let info = Arc::new( + TableInfoBuilder::default() + .table_id(1024) + .table_version(0 as TableVersion) + .name(table_name) + .schema_name("public") + .catalog_name("greptime") + .desc(None) + .table_type(TableType::Base) + .meta(meta) + .build() + .unwrap(), + ); + + let stmt = create_table_stmt(&info, None, '"').unwrap(); + let sql = format!("\n{}", stmt); + assert_eq!( + r#" +CREATE TABLE IF NOT EXISTS "legacy_json" ( + "j" JSON NULL, + "ts" TIMESTAMP(3) NOT NULL, + TIME INDEX ("ts") +) +ENGINE=mito +"#, + sql + ); + } + #[test] fn test_show_create_external_table_sql() { let schema = vec![