From ea639584d9e595b144d808f76d4aeb381dd43ff0 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 12 May 2026 20:24:15 +0800 Subject: [PATCH] feat: compact json2 data Signed-off-by: luofucong --- src/common/recordbatch/src/recordbatch.rs | 2 +- src/mito2/benches/memtable_bench.rs | 2 + src/mito2/src/compaction.rs | 109 ++++++++++++++++-- src/mito2/src/error.rs | 15 ++- src/mito2/src/memtable/bulk/part.rs | 2 + src/mito2/src/read.rs | 3 +- src/mito2/src/read/scan_region.rs | 7 +- src/mito2/src/read/seq_scan.rs | 2 +- src/mito2/src/sst.rs | 21 +++- src/mito2/src/sst/parquet/reader.rs | 2 +- src/mito2/src/sst/parquet/writer.rs | 22 +++- .../standalone/common/types/json/json2.result | 8 ++ .../standalone/common/types/json/json2.sql | 2 + 13 files changed, 174 insertions(+), 23 deletions(-) diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4289714afd..413d89fb78 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } -fn maybe_align_json_array_with_schema( +pub fn maybe_align_json_array_with_schema( schema: &ArrowSchemaRef, arrays: Vec, ) -> Result> { diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 8336625e3c..e6881a766a 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -229,6 +229,7 @@ fn bulk_part_converter(c: &mut Criterion) { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: false, + ..Default::default() }, ); let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false); @@ -255,6 +256,7 @@ fn bulk_part_converter(c: &mut Criterion) { &FlatSchemaOptions { raw_pk_columns: true, string_pk_use_dict: true, + ..Default::default() }, ); let mut converter = diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 1d1151177d..12803e9acc 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -29,16 +29,25 @@ use std::time::Instant; use api::v1::region::compact_request; use api::v1::region::compact_request::Options; +use arrow_schema::Schema; use common_base::Plugins; use common_base::cancellation::CancellationHandle; use common_memory_manager::OnExhaustedPolicy; use common_meta::key::SchemaMetadataManagerRef; +use common_recordbatch::recordbatch::maybe_align_json_array_with_schema; use common_telemetry::{debug, error, info, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::{TimeToLive, Timestamp}; use datafusion_common::ScalarValue; use datafusion_expr::Expr; +use datatypes::arrow::array::RecordBatch; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type::JsonNativeType; +use futures::{StreamExt, TryStreamExt, future}; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::metadata::PageIndexPolicy; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -54,15 +63,17 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ - CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu, - ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, - RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, + CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error, + GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, NewRecordBatchSnafu, + ParquetToArrowSchemaSnafu, RecordBatchSnafu, RegionClosedSnafu, RegionDroppedSnafu, + RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, + TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; -use crate::read::FlatSource; use crate::read::flat_projection::FlatProjectionMapper; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; +use crate::read::{FlatSource, scan_region}; use crate::region::options::{MergeMode, RegionOptions}; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; @@ -72,6 +83,7 @@ use crate::schedule::remote_job_scheduler::{ }; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::{FileHandle, FileMeta, Level}; +use crate::sst::parquet::reader::MetadataCacheMetrics; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -995,19 +1007,63 @@ impl CompactionSstReaderBuilder<'_> { /// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files, /// for compaction. The schema of the [FlatSource] is unified. async fn build_flat_sst_reader(self) -> Result { - let scan_input = self.build_scan_input()?.with_compaction(true); + let scan_input = self.build_scan_input().await?.with_compaction(true); let schema = scan_input.mapper.output_schema(); let schema = schema.arrow_schema(); - SeqScan::new(scan_input) + let stream = SeqScan::new(scan_input) .build_flat_reader_for_compaction() - .await - .map(|stream| FlatSource::new_stream(schema.clone(), stream)) + .await? + .and_then({ + let schema = schema.clone(); + let has_json_extension_field = schema.has_json_extension_field(); + move |record_batch| { + future::ready(if has_json_extension_field { + let (_, columns, _) = record_batch.into_parts(); + maybe_align_json_array_with_schema(&schema, columns) + .context(RecordBatchSnafu) + .and_then(|columns| { + RecordBatch::try_new(schema.clone(), columns) + .context(NewRecordBatchSnafu) + }) + } else { + Ok(record_batch) + }) + } + }) + .boxed(); + Ok(FlatSource::new_stream(schema.clone(), stream)) } - fn build_scan_input(self) -> Result { - let mapper = FlatProjectionMapper::all(&self.metadata)?; + async fn build_scan_input(self) -> Result { + let mut mapper = FlatProjectionMapper::all(&self.metadata)?; + + let schema = self.metadata.schema.arrow_schema(); + if schema.has_json_extension_field() { + let mut json_type_hint = schema + .fields() + .iter() + .filter(|&field| is_json_extension_type(field)) + .map(|field| (field.name().clone(), JsonNativeType::Null)) + .collect::>(); + + let schemas = self.collect_arrow_schemas_from_parquet().await?; + for schema in schemas { + for field in schema.fields() { + let Some(merged) = json_type_hint.get_mut(field.name()) else { + continue; + }; + + let json_type = JsonNativeType::try_from(field.data_type()) + .context(DataTypeMismatchSnafu)?; + merged.merge(&json_type); + } + } + + scan_region::concretize_json_types(&mut mapper, &json_type_hint); + } + let mut scan_input = ScanInput::new(self.sst_layer, mapper) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) @@ -1027,6 +1083,39 @@ impl CompactionSstReaderBuilder<'_> { Ok(scan_input) } + + async fn collect_arrow_schemas_from_parquet(&self) -> Result> { + let mut schemas = Vec::with_capacity(self.inputs.len()); + + for file_handle in self.inputs { + let file_path = + file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type()); + let file_size = file_handle.meta_ref().file_size; + let parquet_metadata = self + .sst_layer + .read_sst(file_handle.clone()) + .read_parquet_metadata( + &file_path, + file_size, + &mut MetadataCacheMetrics::default(), + PageIndexPolicy::default(), + ) + .await + .map(|x| x.0.parquet_metadata())?; + let file_metadata = parquet_metadata.file_metadata(); + + let schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .with_context(|_| ParquetToArrowSchemaSnafu { + parquet_metadata: format!("{:?}", parquet_metadata), + })?; + + schemas.push(schema); + } + Ok(schemas) + } } /// Converts time range to predicates so that rows outside the range will be filtered. diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b822c17976..dcabdcac95 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1248,6 +1248,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Failed generate Arrow schema from Parquet metadata: {}", + parquet_metadata + ))] + ParquetToArrowSchema { + parquet_metadata: String, + #[snafu(source)] + error: parquet::errors::ParquetError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1338,7 +1350,8 @@ impl ErrorExt for Error { | BuildEntry { .. } | Metadata { .. } | CastColumn { .. } - | MitoManifestInfo { .. } => StatusCode::Internal, + | MitoManifestInfo { .. } + | ParquetToArrowSchema { .. } => StatusCode::Internal, FetchManifests { source, .. } => source.status_code(), diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 2b817dcb3a..f739d661a6 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -2115,6 +2115,7 @@ mod tests { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: true, + ..Default::default() }, ); @@ -2552,6 +2553,7 @@ mod tests { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: true, + ..Default::default() }, ); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 5a71bd1a4e..534cb5e865 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -1128,8 +1128,7 @@ impl FlatSource { } } - #[expect(unused)] - fn schema(&self) -> &SchemaRef { + pub(crate) fn schema(&self) -> &SchemaRef { &self.schema } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 19360f7208..455215d01b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -733,7 +733,7 @@ impl ScanRegion { } } -fn concretize_json_types( +pub(crate) fn concretize_json_types( mapper: &mut FlatProjectionMapper, json_type_hint: &HashMap, ) { @@ -751,6 +751,11 @@ fn concretize_json_types( let Some(json_type) = json_type_hint.get(&column_schema.name) else { continue; }; + + debug!( + "column {} set concretize JSON type {}", + column_schema.name, json_type, + ); column_schema.data_type = ConcreteDataType::from(json_type); } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 432099dbcf..6bc796c85c 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -130,7 +130,7 @@ impl SeqScan { /// /// # Panics /// Panics if the compaction flag is not set. - pub async fn build_flat_reader_for_compaction(&self) -> Result { + pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result { assert!(self.stream_ctx.input.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 1007b57668..cd6eb627a6 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -14,9 +14,11 @@ //! Sorted strings tables. +use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; +use arrow_schema::DataType; use common_base::readable_size::ReadableSize; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, @@ -108,6 +110,9 @@ pub struct FlatSchemaOptions { /// when storing primary key columns. /// Only takes effect when `raw_pk_columns` is true. pub string_pk_use_dict: bool, + /// The column's concretized JSON types, to be set into Arrow schema. + /// Otherwise it's empty struct in the Arrow schema. + pub concretized_json_types: HashMap, } impl Default for FlatSchemaOptions { @@ -115,6 +120,7 @@ impl Default for FlatSchemaOptions { Self { raw_pk_columns: true, string_pk_use_dict: true, + concretized_json_types: HashMap::new(), } } } @@ -128,6 +134,7 @@ impl FlatSchemaOptions { Self { raw_pk_columns: false, string_pk_use_dict: false, + concretized_json_types: HashMap::new(), } } } @@ -159,6 +166,7 @@ pub fn to_flat_sst_arrow_schema( &metadata.column_metadatas[pk_index].column_schema.data_type, old_field, ); + let new_field = concretize_json_type(new_field, options); fields.push(Arc::new(with_field_id((*new_field).clone(), column_id))); } } @@ -169,8 +177,9 @@ pub fn to_flat_sst_arrow_schema( .zip(&metadata.column_metadatas) .filter_map(|(field, column_meta)| { if column_meta.semantic_type == SemanticType::Field { + let field = concretize_json_type(field.clone(), options); Some(Arc::new(with_field_id( - (**field).clone(), + Arc::unwrap_or_clone(field), column_meta.column_id, ))) } else { @@ -189,6 +198,16 @@ pub fn to_flat_sst_arrow_schema( Arc::new(Schema::new(fields)) } +fn concretize_json_type(field: Arc, options: &FlatSchemaOptions) -> Arc { + if let Some(data_type) = options.concretized_json_types.get(field.name()) { + let mut field = Arc::unwrap_or_clone(field); + field.set_data_type(data_type.clone()); + Arc::new(field) + } else { + field + } +} + /// Returns the number of columns in the flat format. pub fn flat_sst_arrow_schema_column_num( metadata: &RegionMetadata, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index f03856a521..7f6c0700fe 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -584,7 +584,7 @@ impl ParquetReaderBuilder { /// Reads parquet metadata of specific file. /// Returns (fused metadata, cache_miss_flag). - async fn read_parquet_metadata( + pub(crate) async fn read_parquet_metadata( &self, file_path: &str, file_size: u64, diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 13005ff9fc..2faab17e72 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,6 +14,7 @@ //! Parquet writer. +use std::collections::HashMap; use std::future::Future; use std::mem; use std::pin::Pin; @@ -31,6 +32,8 @@ use datatypes::arrow::array::{ use datatypes::arrow::compute::{max, min}; use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::ext::ArrowSchemaExt; use object_store::{FuturesAsyncWriter, ObjectStore}; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; @@ -271,12 +274,21 @@ where override_sequence: Option, opts: &WriteOptions, ) -> Result { + let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); + + if source.schema().has_json_extension_field() { + options.concretized_json_types = source + .schema() + .fields() + .iter() + .filter(|&field| is_json_extension_type(field)) + .map(|field| (field.name().clone(), field.data_type().clone())) + .collect::>(); + } + let converter = FlatBatchConverter::Flat( - FlatWriteFormat::new( - self.metadata.clone(), - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) - .with_override_sequence(override_sequence), + FlatWriteFormat::new(self.metadata.clone(), &options) + .with_override_sequence(override_sequence), ); let res = self.write_all_flat_inner(source, &converter, opts).await; if res.is_err() { diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index 5bdc641433..a0933a9a98 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -42,6 +42,14 @@ admin flush_table('json2_table'); | 0 | +----------------------------------+ +admin compact_table('json2_table', 'swcs', '86400'); + ++-----------------------------------------------------+ +| ADMIN compact_table('json2_table', 'swcs', '86400') | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 75c7b46b41..57e113f8be 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'), admin flush_table('json2_table'); +admin compact_table('json2_table', 'swcs', '86400'); + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}');