feat: compact json2 data

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-12 20:24:15 +08:00
parent 73c267e641
commit ea639584d9
13 changed files with 174 additions and 23 deletions

View File

@@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
fn maybe_align_json_array_with_schema(
pub fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {

View File

@@ -229,6 +229,7 @@ fn bulk_part_converter(c: &mut Criterion) {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: false,
..Default::default()
},
);
let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false);
@@ -255,6 +256,7 @@ fn bulk_part_converter(c: &mut Criterion) {
&FlatSchemaOptions {
raw_pk_columns: true,
string_pk_use_dict: true,
..Default::default()
},
);
let mut converter =

View File

@@ -29,16 +29,25 @@ use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use arrow_schema::Schema;
use common_base::Plugins;
use common_base::cancellation::CancellationHandle;
use common_memory_manager::OnExhaustedPolicy;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::recordbatch::maybe_align_json_array_with_schema;
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datatypes::arrow::array::RecordBatch;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use futures::{StreamExt, TryStreamExt, future};
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::metadata::PageIndexPolicy;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -54,15 +63,17 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, NewRecordBatchSnafu,
ParquetToArrowSchemaSnafu, RecordBatchSnafu, RegionClosedSnafu, RegionDroppedSnafu,
RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::FlatSource;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
use crate::read::{FlatSource, scan_region};
use crate::region::options::{MergeMode, RegionOptions};
use crate::region::version::VersionControlRef;
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
@@ -72,6 +83,7 @@ use crate::schedule::remote_job_scheduler::{
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileMeta, Level};
use crate::sst::parquet::reader::MetadataCacheMetrics;
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
@@ -995,19 +1007,63 @@ impl CompactionSstReaderBuilder<'_> {
/// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
/// for compaction. The schema of the [FlatSource] is unified.
async fn build_flat_sst_reader(self) -> Result<FlatSource> {
let scan_input = self.build_scan_input()?.with_compaction(true);
let scan_input = self.build_scan_input().await?.with_compaction(true);
let schema = scan_input.mapper.output_schema();
let schema = schema.arrow_schema();
SeqScan::new(scan_input)
let stream = SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
.map(|stream| FlatSource::new_stream(schema.clone(), stream))
.await?
.and_then({
let schema = schema.clone();
let has_json_extension_field = schema.has_json_extension_field();
move |record_batch| {
future::ready(if has_json_extension_field {
let (_, columns, _) = record_batch.into_parts();
maybe_align_json_array_with_schema(&schema, columns)
.context(RecordBatchSnafu)
.and_then(|columns| {
RecordBatch::try_new(schema.clone(), columns)
.context(NewRecordBatchSnafu)
})
} else {
Ok(record_batch)
})
}
})
.boxed();
Ok(FlatSource::new_stream(schema.clone(), stream))
}
fn build_scan_input(self) -> Result<ScanInput> {
let mapper = FlatProjectionMapper::all(&self.metadata)?;
async fn build_scan_input(self) -> Result<ScanInput> {
let mut mapper = FlatProjectionMapper::all(&self.metadata)?;
let schema = self.metadata.schema.arrow_schema();
if schema.has_json_extension_field() {
let mut json_type_hint = schema
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.map(|field| (field.name().clone(), JsonNativeType::Null))
.collect::<HashMap<_, _>>();
let schemas = self.collect_arrow_schemas_from_parquet().await?;
for schema in schemas {
for field in schema.fields() {
let Some(merged) = json_type_hint.get_mut(field.name()) else {
continue;
};
let json_type = JsonNativeType::try_from(field.data_type())
.context(DataTypeMismatchSnafu)?;
merged.merge(&json_type);
}
}
scan_region::concretize_json_types(&mut mapper, &json_type_hint);
}
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
@@ -1027,6 +1083,39 @@ impl CompactionSstReaderBuilder<'_> {
Ok(scan_input)
}
async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
let mut schemas = Vec::with_capacity(self.inputs.len());
for file_handle in self.inputs {
let file_path =
file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
let file_size = file_handle.meta_ref().file_size;
let parquet_metadata = self
.sst_layer
.read_sst(file_handle.clone())
.read_parquet_metadata(
&file_path,
file_size,
&mut MetadataCacheMetrics::default(),
PageIndexPolicy::default(),
)
.await
.map(|x| x.0.parquet_metadata())?;
let file_metadata = parquet_metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.with_context(|_| ParquetToArrowSchemaSnafu {
parquet_metadata: format!("{:?}", parquet_metadata),
})?;
schemas.push(schema);
}
Ok(schemas)
}
}
/// Converts time range to predicates so that rows outside the range will be filtered.

View File

@@ -1248,6 +1248,18 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed generate Arrow schema from Parquet metadata: {}",
parquet_metadata
))]
ParquetToArrowSchema {
parquet_metadata: String,
#[snafu(source)]
error: parquet::errors::ParquetError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1338,7 +1350,8 @@ impl ErrorExt for Error {
| BuildEntry { .. }
| Metadata { .. }
| CastColumn { .. }
| MitoManifestInfo { .. } => StatusCode::Internal,
| MitoManifestInfo { .. }
| ParquetToArrowSchema { .. } => StatusCode::Internal,
FetchManifests { source, .. } => source.status_code(),

View File

@@ -2115,6 +2115,7 @@ mod tests {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
..Default::default()
},
);
@@ -2552,6 +2553,7 @@ mod tests {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
..Default::default()
},
);

View File

@@ -1128,8 +1128,7 @@ impl FlatSource {
}
}
#[expect(unused)]
fn schema(&self) -> &SchemaRef {
pub(crate) fn schema(&self) -> &SchemaRef {
&self.schema
}

View File

@@ -733,7 +733,7 @@ impl ScanRegion {
}
}
fn concretize_json_types(
pub(crate) fn concretize_json_types(
mapper: &mut FlatProjectionMapper,
json_type_hint: &HashMap<String, JsonNativeType>,
) {
@@ -751,6 +751,11 @@ fn concretize_json_types(
let Some(json_type) = json_type_hint.get(&column_schema.name) else {
continue;
};
debug!(
"column {} set concretize JSON type {}",
column_schema.name, json_type,
);
column_schema.data_type = ConcreteDataType::from(json_type);
}

View File

@@ -130,7 +130,7 @@ impl SeqScan {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();

View File

@@ -14,9 +14,11 @@
//! Sorted strings tables.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use arrow_schema::DataType;
use common_base::readable_size::ReadableSize;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
@@ -108,6 +110,9 @@ pub struct FlatSchemaOptions {
/// when storing primary key columns.
/// Only takes effect when `raw_pk_columns` is true.
pub string_pk_use_dict: bool,
/// The column's concretized JSON types, to be set into Arrow schema.
/// Otherwise it's empty struct in the Arrow schema.
pub concretized_json_types: HashMap<String, DataType>,
}
impl Default for FlatSchemaOptions {
@@ -115,6 +120,7 @@ impl Default for FlatSchemaOptions {
Self {
raw_pk_columns: true,
string_pk_use_dict: true,
concretized_json_types: HashMap::new(),
}
}
}
@@ -128,6 +134,7 @@ impl FlatSchemaOptions {
Self {
raw_pk_columns: false,
string_pk_use_dict: false,
concretized_json_types: HashMap::new(),
}
}
}
@@ -159,6 +166,7 @@ pub fn to_flat_sst_arrow_schema(
&metadata.column_metadatas[pk_index].column_schema.data_type,
old_field,
);
let new_field = concretize_json_type(new_field, options);
fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
}
}
@@ -169,8 +177,9 @@ pub fn to_flat_sst_arrow_schema(
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
let field = concretize_json_type(field.clone(), options);
Some(Arc::new(with_field_id(
(**field).clone(),
Arc::unwrap_or_clone(field),
column_meta.column_id,
)))
} else {
@@ -189,6 +198,16 @@ pub fn to_flat_sst_arrow_schema(
Arc::new(Schema::new(fields))
}
fn concretize_json_type(field: Arc<Field>, options: &FlatSchemaOptions) -> Arc<Field> {
if let Some(data_type) = options.concretized_json_types.get(field.name()) {
let mut field = Arc::unwrap_or_clone(field);
field.set_data_type(data_type.clone());
Arc::new(field)
} else {
field
}
}
/// Returns the number of columns in the flat format.
pub fn flat_sst_arrow_schema_column_num(
metadata: &RegionMetadata,

View File

@@ -584,7 +584,7 @@ impl ParquetReaderBuilder {
/// Reads parquet metadata of specific file.
/// Returns (fused metadata, cache_miss_flag).
async fn read_parquet_metadata(
pub(crate) async fn read_parquet_metadata(
&self,
file_path: &str,
file_size: u64,

View File

@@ -14,6 +14,7 @@
//! Parquet writer.
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::Pin;
@@ -31,6 +32,8 @@ use datatypes::arrow::array::{
use datatypes::arrow::compute::{max, min};
use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use object_store::{FuturesAsyncWriter, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -271,12 +274,21 @@ where
override_sequence: Option<SequenceNumber>,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
if source.schema().has_json_extension_field() {
options.concretized_json_types = source
.schema()
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.map(|field| (field.name().clone(), field.data_type().clone()))
.collect::<HashMap<_, _>>();
}
let converter = FlatBatchConverter::Flat(
FlatWriteFormat::new(
self.metadata.clone(),
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
.with_override_sequence(override_sequence),
FlatWriteFormat::new(self.metadata.clone(), &options)
.with_override_sequence(override_sequence),
);
let res = self.write_all_flat_inner(source, &converter, opts).await;
if res.is_err() {

View File

@@ -42,6 +42,14 @@ admin flush_table('json2_table');
| 0 |
+----------------------------------+
admin compact_table('json2_table', 'swcs', '86400');
+-----------------------------------------------------+
| ADMIN compact_table('json2_table', 'swcs', '86400') |
+-----------------------------------------------------+
| 0 |
+-----------------------------------------------------+
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');

View File

@@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
admin flush_table('json2_table');
admin compact_table('json2_table', 'swcs', '86400');
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');