fix: guard structured JSON alignment paths against legacy JSONB columns (#8323)

* Initial plan

* fix: guard structured json alignment to fix Clippy CI failure

- Add `is_structured_json_field` function that only returns true for
  fields with both JSON extension type AND Struct Arrow data type
- Replace all usages of `is_json_extension_type` / `has_json_extension_field`
  with `is_structured_json_field` to prevent legacy JSONB binary columns
  from entering structured JSON alignment paths
- Fix logic in `FlatProjectionMapper::new_with_read_columns` to guard
  JSON type hint concretization for JSON2 columns only
- Fix `create_column` in show_create_table.rs to only emit JSON structure
  settings for JSON2 columns
- Move `mod tests` to end of flat_projection.rs to fix clippy::items_after_test_module
- Add tests for legacy JSON behavior

* fix: guard narrow_read_columns_by_json_type_hint with is_structured_json_field check

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
Copilot
2026-06-18 13:27:56 +08:00
committed by GitHub
parent a3461caf9d
commit 58c85dd1ce
10 changed files with 215 additions and 45 deletions

View File

@@ -21,7 +21,7 @@ use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef};
use datatypes::arrow::array::{Array, AsArray, RecordBatchOptions};
use datatypes::extension::json::is_json_extension_type;
use datatypes::extension::json::is_structured_json_field;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::vectors::json::array::JsonArray;
@@ -359,13 +359,13 @@ fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
if schema.fields().iter().all(|f| !is_structured_json_field(f)) {
return Ok(arrays);
}
let mut aligned = Vec::with_capacity(arrays.len());
for (field, array) in schema.fields().iter().zip(arrays) {
if !is_json_extension_type(field) {
if !is_structured_json_field(field) {
aligned.push(array);
continue;
}
@@ -382,10 +382,11 @@ fn maybe_align_json_array_with_schema(
mod tests {
use std::sync::Arc;
use datatypes::arrow::array::{AsArray, UInt32Array};
use datatypes::arrow::array::{AsArray, BinaryArray, UInt32Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, UInt32Type};
use datatypes::arrow_array::StringArray;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector};
@@ -507,4 +508,17 @@ mod tests {
.expect("merge recordbatch");
assert_eq!(merged.num_rows(), 8);
}
#[test]
fn test_legacy_json_with_extension_does_not_align_as_structured_json() {
let field = Field::new("j", DataType::Binary, true)
.with_extension_type(JsonExtensionType::new(Arc::new(JsonMetadata::default())));
let arrow_schema = Arc::new(ArrowSchema::new(vec![field]));
let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
let arrays =
vec![Arc::new(BinaryArray::from(vec![Some(br#"{"a":1}"#.as_slice())])) as ArrayRef];
let aligned = maybe_align_json_array_with_schema(schema.arrow_schema(), arrays).unwrap();
assert_eq!(aligned[0].data_type(), &DataType::Binary);
}
}

View File

@@ -107,3 +107,11 @@ impl ExtensionType for JsonExtensionType {
pub fn is_json_extension_type(field: &FieldRef) -> bool {
field.extension_type_name() == Some(JsonExtensionType::NAME)
}
/// Check if this field is a structured JSON field.
///
/// Legacy JSONB columns may carry JSON extension metadata due to old metadata versions, but their
/// physical Arrow type is still Binary. They must not enter structured JSON alignment paths.
pub fn is_structured_json_field(field: &FieldRef) -> bool {
is_json_extension_type(field) && matches!(field.data_type(), DataType::Struct(_))
}

View File

@@ -40,8 +40,7 @@ use common_time::timestamp::TimeUnit;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::extension::json::is_structured_json_field;
use datatypes::types::json_type::JsonNativeType;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::metadata::PageIndexPolicy;
@@ -1040,11 +1039,11 @@ impl CompactionSstReaderBuilder<'_> {
async fn build_scan_input(self) -> Result<ScanInput> {
let schema = self.metadata.schema.arrow_schema();
let json_type_hint = if schema.has_json_extension_field() {
let json_type_hint = if schema.fields().iter().any(is_structured_json_field) {
let mut json_type_hint = schema
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.filter(|&field| is_structured_json_field(field))
.map(|field| (field.name().clone(), JsonNativeType::Null))
.collect::<HashMap<_, _>>();

View File

@@ -37,9 +37,8 @@ use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
};
use datatypes::data_type::DataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::extension::json::is_structured_json_field;
use datatypes::prelude::{MutableVector, Vector};
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::JsonType;
use datatypes::value::ValueRef;
use datatypes::vectors::Helper;
@@ -435,7 +434,7 @@ impl UnorderedPart {
// Get the schema from the first part
let schema = self.parts[0].batch.schema();
let concatenated = if schema.has_json_extension_field() {
let concatenated = if schema.fields().iter().any(is_structured_json_field) {
let (schema, batches) = align_parts(&self.parts)?;
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
} else {
@@ -496,7 +495,7 @@ fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
let mut merged_types = HashMap::new();
let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
for (i, field) in base_schema.fields().iter().enumerate() {
if is_json_extension_type(field) {
if is_structured_json_field(field) {
let mut merged = JsonType::from(field.data_type());
rest.iter()
.try_fold(&mut merged, |acc, x| {
@@ -775,13 +774,13 @@ impl BulkPartConverter {
}
fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
if schema.fields().iter().all(|f| !is_structured_json_field(f)) {
return schema;
}
let mut fields = Vec::with_capacity(schema.fields().len());
for (field, array) in schema.fields().iter().zip(columns) {
if !is_json_extension_type(field) {
if !is_structured_json_field(field) {
fields.push(field.clone());
continue;
}

View File

@@ -25,9 +25,8 @@ use datatypes::arrow::compute::{TakeOptions, take};
use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::extension::json::is_structured_json_field;
use datatypes::prelude::DataType;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use datatypes::vectors::json::array::JsonArray;
@@ -100,9 +99,14 @@ impl FlatCompatBatch {
let actual = read_format.metadata();
let format_projection = read_format.format_projection();
let mut actual_schema = flat_projected_columns(actual, format_projection);
if read_format.arrow_schema().has_json_extension_field() {
if read_format
.arrow_schema()
.fields()
.iter()
.any(is_structured_json_field)
{
for field in read_format.arrow_schema().fields() {
if is_json_extension_type(field)
if is_structured_json_field(field)
&& let Some(column_id) =
actual.column_by_name(field.name()).map(|x| x.column_id)
&& let Some(i) = actual_schema.iter().position(|x| x.0 == column_id)

View File

@@ -25,7 +25,7 @@ use common_recordbatch::error::{
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::extension::json::is_json_extension_type;
use datatypes::extension::json::is_structured_json_field;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::types::json_type::JsonNativeType;
@@ -118,7 +118,10 @@ impl FlatProjectionMapper {
.and_then(|x| x.get(&schema.name))
.cloned()
.map(ConcreteDataType::json2)
&& schema.data_type.is_json()
&& schema
.data_type
.as_json()
.is_some_and(|json_type| json_type.is_json2())
{
schema.data_type = concretized;
}
@@ -142,10 +145,13 @@ impl FlatProjectionMapper {
&& !json_type_hint.is_empty()
{
for (column_id, data_type) in batch_schema.iter_mut() {
if let Some(concretized) = metadata
.column_by_id(*column_id)
.and_then(|x| json_type_hint.get(&x.column_schema.name).cloned())
.map(ConcreteDataType::json2)
if data_type
.as_json()
.is_some_and(|json_type| json_type.is_json2())
&& let Some(concretized) = metadata
.column_by_id(*column_id)
.and_then(|x| json_type_hint.get(&x.column_schema.name).cloned())
.map(ConcreteDataType::json2)
{
*data_type = concretized;
}
@@ -260,7 +266,7 @@ impl FlatProjectionMapper {
.input_arrow_schema
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.filter(|&field| is_structured_json_field(field))
.map(|field| (field.name().clone(), field.data_type().clone()))
.collect();
to_flat_sst_arrow_schema(&self.metadata, &options)
@@ -321,7 +327,7 @@ impl FlatProjectionMapper {
}
let field = &self.output_schema.arrow_schema().fields()[output_idx];
if is_json_extension_type(field) {
if is_structured_json_field(field) {
array = JsonArray::from(&array)
.try_align(field.data_type())
.context(DataTypesSnafu)?;
@@ -548,3 +554,60 @@ impl DfBatchAssembler {
RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns)
}
}
#[cfg(test)]
mod tests {
use datatypes::types::json_type::JsonObjectType;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
fn metadata_with_legacy_json() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
builder
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"j",
ConcreteDataType::json_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
});
Arc::new(builder.build().unwrap())
}
#[test]
fn test_json_type_hint_does_not_concretize_legacy_json() {
let metadata = metadata_with_legacy_json();
let hint = HashMap::from([(
"j".to_string(),
JsonNativeType::Object(JsonObjectType::from([(
"a".to_string(),
JsonNativeType::i64(),
)])),
)]);
let mapper = FlatProjectionMapper::new_with_read_columns(
&metadata,
vec![0, 1],
ReadColumns::from_deduped_column_ids([0, 1]),
Some(&hint),
)
.unwrap();
assert_eq!(
mapper.batch_schema()[0],
(0, ConcreteDataType::json_datatype())
);
}
}

View File

@@ -31,7 +31,7 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::extension::json::is_structured_json_field;
use datatypes::types::json_type::JsonNativeType;
use futures::StreamExt;
use itertools::Itertools;
@@ -425,11 +425,24 @@ impl ScanRegion {
ReadColumns::from_deduped_column_ids(read_col_ids)
}
};
narrow_read_columns_by_json_type_hint(
&mut read_cols,
&self.request.json_type_hint,
&self.version.metadata,
);
// Only narrow read columns and pass JSON type hints for structured JSON (JSON2)
// columns. Legacy JSONB columns have JSON extension metadata but their physical
// Arrow type is Binary, not Struct, so they must not enter structured JSON paths.
let has_structured_json = self
.version
.metadata
.schema
.arrow_schema()
.fields()
.iter()
.any(is_structured_json_field);
if has_structured_json {
narrow_read_columns_by_json_type_hint(
&mut read_cols,
&self.request.json_type_hint,
&self.version.metadata,
);
}
let read_col_ids = read_cols.column_ids();
// The mapper always computes projected column ids as the schema of SSTs may change.
@@ -438,12 +451,7 @@ impl ScanRegion {
.projection_indices()
.map(|x| x.to_vec())
.unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
let json_type_hint = self
.version
.metadata
.schema
.arrow_schema()
.has_json_extension_field()
let json_type_hint = has_structured_json
.then_some(&self.request.json_type_hint)
.inspect(|json_type_hint| {
debug!(

View File

@@ -33,8 +33,8 @@ use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::is_structured_json_field;
use datatypes::prelude::DataType;
use datatypes::schema::ext::ArrowSchemaExt;
use futures::StreamExt;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
@@ -525,7 +525,12 @@ impl ParquetReaderBuilder {
// Create ArrowReaderMetadata for async stream building.
let mut arrow_reader_options = ArrowReaderOptions::new();
if !read_format.arrow_schema().has_json_extension_field() {
if !read_format
.arrow_schema()
.fields()
.iter()
.any(is_structured_json_field)
{
// Read `__primary_key` as Binary when it's too large for dictionary
// encoding; convert_batch wraps it back to a DictionaryArray.
let schema_for_reader = if should_read_pk_as_binary(&parquet_meta) {

View File

@@ -32,8 +32,7 @@ use datatypes::arrow::array::{
use datatypes::arrow::compute::{max, min};
use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::extension::json::is_structured_json_field;
use object_store::{FuturesAsyncWriter, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -276,12 +275,17 @@ where
) -> Result<SstInfoArray> {
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
if source.schema().has_json_extension_field() {
if source
.schema()
.fields()
.iter()
.any(is_structured_json_field)
{
options.concretized_json_types = source
.schema()
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.filter(|&field| is_structured_json_field(field))
.map(|field| (field.name().clone(), field.data_type().clone()))
.collect::<HashMap<_, _>>();
}

View File

@@ -197,7 +197,12 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Colu
extensions.inverted_index_options = Some(HashMap::new().into());
}
if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
if column_schema
.data_type
.as_json()
.is_some_and(|json_type| json_type.is_json2())
&& let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()?
{
let settings = json_extension
.metadata()
.json_structure_settings
@@ -419,6 +424,67 @@ WITH(
);
}
#[test]
fn test_show_create_legacy_json_with_json_extension() {
let mut json_column = ColumnSchema::new("j", ConcreteDataType::json_datatype(), true);
json_column
.with_extension_type(&JsonExtensionType::new(Arc::new(
datatypes::extension::json::JsonMetadata {
json_structure_settings: Some(datatypes::json::JsonStructureSettings::default()),
},
)))
.unwrap();
let table_schema = SchemaRef::new(Schema::new(vec![
json_column,
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_datatype(TimeUnit::Millisecond),
false,
)
.with_time_index(true),
]));
let table_name = "legacy_json";
let meta = TableMetaBuilder::empty()
.schema(table_schema)
.primary_key_indices(vec![])
.value_indices(vec![0])
.engine("mito".to_string())
.next_column_id(0)
.options(Default::default())
.created_on(Default::default())
.build()
.unwrap();
let info = Arc::new(
TableInfoBuilder::default()
.table_id(1024)
.table_version(0 as TableVersion)
.name(table_name)
.schema_name("public")
.catalog_name("greptime")
.desc(None)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap(),
);
let stmt = create_table_stmt(&info, None, '"').unwrap();
let sql = format!("\n{}", stmt);
assert_eq!(
r#"
CREATE TABLE IF NOT EXISTS "legacy_json" (
"j" JSON NULL,
"ts" TIMESTAMP(3) NOT NULL,
TIME INDEX ("ts")
)
ENGINE=mito
"#,
sql
);
}
#[test]
fn test_show_create_external_table_sql() {
let schema = vec![