Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-07 14:21:14 +08:00
parent 874e7fc884
commit f0adf296d6
5 changed files with 49 additions and 18 deletions

View File

@@ -39,6 +39,7 @@ use common_time::timestamp::TimeUnit;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datatypes::schema::ext::ArrowSchemaExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -844,17 +845,26 @@ impl CompactionSstReaderBuilder<'_> {
async fn build_flat_sst_reader(self) -> Result<(Option<SchemaRef>, BoxedRecordBatchStream)> {
let scan_input = self.build_scan_input()?.with_compaction(true);
let parquet_schemas = scan_input.collect_parquet_record_batch_schemas().await?;
let schema = if let Some((base, others)) = parquet_schemas.split_first() {
Some(merge_json_extension_fields(base, others))
let json_concretized_schema = if scan_input
.mapper
.output_schema()
.arrow_schema()
.has_json_extension_field()
{
let parquet_schemas = scan_input.collect_parquet_record_batch_schemas().await?;
if let Some((base, others)) = parquet_schemas.split_first() {
Some(merge_json_extension_fields(base, others))
} else {
None
}
} else {
None
};
let reader = SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.build_flat_reader_for_compaction(json_concretized_schema.clone())
.await?;
Ok((schema, reader))
Ok((json_concretized_schema, reader))
}
fn build_scan_input(self) -> Result<ScanInput> {

View File

@@ -682,7 +682,7 @@ mod tests {
let rb = adapter.into_iter().next().unwrap().unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
assert_eq!(rb.schema(), mapper.input_arrow_schema());
// tag_0 + field_1 + ts + 3 internal columns.
assert_eq!(6, rb.num_columns());
assert_eq!(3, rb.num_rows());

View File

@@ -17,6 +17,7 @@
use std::sync::Arc;
use api::v1::SemanticType;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_schema::extension::ExtensionType;
use common_error::ext::BoxedError;
use common_recordbatch::error::{
@@ -38,6 +39,7 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
use crate::read::scan_region::ScanInput;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
@@ -221,21 +223,26 @@ impl FlatProjectionMapper {
&self.batch_schema
}
#[cfg(test)]
pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
self.input_arrow_schema.clone()
}
/// Returns the input arrow schema from sources.
///
/// The merge reader can use this schema.
pub(crate) fn input_arrow_schema(
pub(crate) fn arrow_schema_by_compaction(
&self,
compaction: bool,
scan_input: &ScanInput,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> datatypes::arrow::datatypes::SchemaRef {
if !compaction {
if !scan_input.compaction {
self.input_arrow_schema.clone()
} else {
// For compaction, we need to build a different schema from encoding.
to_flat_sst_arrow_schema(
&self.metadata,
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
options.override_schema = json_concretized_schema;
to_flat_sst_arrow_schema(&self.metadata, &options)
}
}

View File

@@ -18,6 +18,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::util::ChainedRecordBatchStream;
@@ -127,7 +128,10 @@ impl SeqScan {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
pub async fn build_flat_reader_for_compaction(
&self,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> Result<BoxedRecordBatchStream> {
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();
@@ -140,6 +144,7 @@ impl SeqScan {
partition_ranges,
&part_metrics,
self.pruner.clone(),
json_concretized_schema,
)
.await?;
Ok(reader)
@@ -152,6 +157,7 @@ impl SeqScan {
partition_ranges: &[PartitionRange],
part_metrics: &PartitionMetrics,
pruner: Arc<Pruner>,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> Result<BoxedRecordBatchStream> {
pruner.add_partition_ranges(partition_ranges);
let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
@@ -182,6 +188,7 @@ impl SeqScan {
None,
None,
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
json_concretized_schema,
)
.await
}
@@ -195,6 +202,7 @@ impl SeqScan {
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
channel_size: usize,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
@@ -208,7 +216,7 @@ impl SeqScan {
}
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
let schema = mapper.arrow_schema_by_compaction(&stream_ctx.input, json_concretized_schema);
let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
let reader =
@@ -345,9 +353,14 @@ impl SeqScan {
let channel_size = compute_parallel_channel_size(
split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE),
);
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size)
.await?;
let mut reader = Self::build_flat_reader_from_sources(
&stream_ctx,
sources,
semaphore.clone(),
Some(&part_metrics),
channel_size,
None,
).await?;
let mut metrics = ScannerMetrics {
scan_cost: fetch_start.elapsed(),

View File

@@ -514,6 +514,7 @@ impl SeriesDistributor {
self.semaphore.clone(),
Some(&part_metrics),
channel_size,
None,
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();