diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index c624a6f362..b02675be0b 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -39,6 +39,7 @@ use common_time::timestamp::TimeUnit; use common_time::{TimeToLive, Timestamp}; use datafusion_common::ScalarValue; use datafusion_expr::Expr; +use datatypes::schema::ext::ArrowSchemaExt; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -844,17 +845,26 @@ impl CompactionSstReaderBuilder<'_> { async fn build_flat_sst_reader(self) -> Result<(Option, BoxedRecordBatchStream)> { let scan_input = self.build_scan_input()?.with_compaction(true); - let parquet_schemas = scan_input.collect_parquet_record_batch_schemas().await?; - let schema = if let Some((base, others)) = parquet_schemas.split_first() { - Some(merge_json_extension_fields(base, others)) + let json_concretized_schema = if scan_input + .mapper + .output_schema() + .arrow_schema() + .has_json_extension_field() + { + let parquet_schemas = scan_input.collect_parquet_record_batch_schemas().await?; + if let Some((base, others)) = parquet_schemas.split_first() { + Some(merge_json_extension_fields(base, others)) + } else { + None + } } else { None }; let reader = SeqScan::new(scan_input) - .build_flat_reader_for_compaction() + .build_flat_reader_for_compaction(json_concretized_schema.clone()) .await?; - Ok((schema, reader)) + Ok((json_concretized_schema, reader)) } fn build_scan_input(self) -> Result { diff --git a/src/mito2/src/read/batch_adapter.rs b/src/mito2/src/read/batch_adapter.rs index 4698229c5b..660d56c8ef 100644 --- a/src/mito2/src/read/batch_adapter.rs +++ b/src/mito2/src/read/batch_adapter.rs @@ -682,7 +682,7 @@ mod tests { let rb = adapter.into_iter().next().unwrap().unwrap(); let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap(); - assert_eq!(rb.schema(), mapper.input_arrow_schema(false)); + assert_eq!(rb.schema(), mapper.input_arrow_schema()); // tag_0 + field_1 + ts + 3 internal columns. assert_eq!(6, rb.num_columns()); assert_eq!(3, rb.num_rows()); diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 4b776c4c98..ff4fe498d4 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use api::v1::SemanticType; +use arrow_schema::SchemaRef as ArrowSchemaRef; use arrow_schema::extension::ExtensionType; use common_error::ext::BoxedError; use common_recordbatch::error::{ @@ -38,6 +39,7 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache}; +use crate::read::scan_region::ScanInput; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; use crate::sst::{ @@ -221,21 +223,26 @@ impl FlatProjectionMapper { &self.batch_schema } + #[cfg(test)] + pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef { + self.input_arrow_schema.clone() + } + /// Returns the input arrow schema from sources. /// /// The merge reader can use this schema. - pub(crate) fn input_arrow_schema( + pub(crate) fn arrow_schema_by_compaction( &self, - compaction: bool, + scan_input: &ScanInput, + json_concretized_schema: Option, ) -> datatypes::arrow::datatypes::SchemaRef { - if !compaction { + if !scan_input.compaction { self.input_arrow_schema.clone() } else { // For compaction, we need to build a different schema from encoding. - to_flat_sst_arrow_schema( - &self.metadata, - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) + let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); + options.override_schema = json_concretized_schema; + to_flat_sst_arrow_schema(&self.metadata, &options) } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 15ab435425..1a78428564 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -18,6 +18,7 @@ use std::fmt; use std::sync::Arc; use std::time::Instant; +use arrow_schema::SchemaRef as ArrowSchemaRef; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::util::ChainedRecordBatchStream; @@ -127,7 +128,10 @@ impl SeqScan { /// /// # Panics /// Panics if the compaction flag is not set. - pub async fn build_flat_reader_for_compaction(&self) -> Result { + pub async fn build_flat_reader_for_compaction( + &self, + json_concretized_schema: Option, + ) -> Result { assert!(self.stream_ctx.input.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); @@ -140,6 +144,7 @@ impl SeqScan { partition_ranges, &part_metrics, self.pruner.clone(), + json_concretized_schema, ) .await?; Ok(reader) @@ -152,6 +157,7 @@ impl SeqScan { partition_ranges: &[PartitionRange], part_metrics: &PartitionMetrics, pruner: Arc, + json_concretized_schema: Option, ) -> Result { pruner.add_partition_ranges(partition_ranges); let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges)); @@ -182,6 +188,7 @@ impl SeqScan { None, None, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE), + json_concretized_schema, ) .await } @@ -195,6 +202,7 @@ impl SeqScan { semaphore: Option>, part_metrics: Option<&PartitionMetrics>, channel_size: usize, + json_concretized_schema: Option, ) -> Result { if let Some(semaphore) = semaphore.as_ref() { // Read sources in parallel. @@ -208,7 +216,7 @@ impl SeqScan { } let mapper = stream_ctx.input.mapper.as_flat().unwrap(); - let schema = mapper.input_arrow_schema(stream_ctx.input.compaction); + let schema = mapper.arrow_schema_by_compaction(&stream_ctx.input, json_concretized_schema); let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter()); let reader = @@ -345,9 +353,14 @@ impl SeqScan { let channel_size = compute_parallel_channel_size( split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE), ); - let mut reader = - Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size) - .await?; + let mut reader = Self::build_flat_reader_from_sources( + &stream_ctx, + sources, + semaphore.clone(), + Some(&part_metrics), + channel_size, + None, + ).await?; let mut metrics = ScannerMetrics { scan_cost: fetch_start.elapsed(), diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index bf7ed072ab..fa27a153a6 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -514,6 +514,7 @@ impl SeriesDistributor { self.semaphore.clone(), Some(&part_metrics), channel_size, + None, ) .await?; let mut metrics = SeriesDistributorMetrics::default();