From bafe4576e8c2c6dae2817e0ea72e8188f6904260 Mon Sep 17 00:00:00 2001 From: luofucong Date: Sat, 9 May 2026 19:12:10 +0800 Subject: [PATCH] refactor: store the schema of flat source Signed-off-by: luofucong --- src/cmd/src/datanode/objbench.rs | 4 ++- src/mito2/src/cache/write_cache.rs | 9 +++-- src/mito2/src/compaction.rs | 8 +++-- src/mito2/src/compaction/compactor.rs | 4 +-- src/mito2/src/flush.rs | 35 ++++++++----------- src/mito2/src/read.rs | 49 ++++++++++++++++++++++++--- src/mito2/src/test_util/sst_util.rs | 4 +-- 7 files changed, 77 insertions(+), 36 deletions(-) diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index d8793bee2f..a298430c83 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -244,10 +244,12 @@ impl ObjbenchCommand { ..Default::default() }; + let source = + FlatSource::new_stream(region_meta.schema.arrow_schema().clone(), reader_stream); let write_req = SstWriteRequest { op_type: OperationType::Flush, metadata: region_meta, - source: FlatSource::Stream(reader_stream), + source, cache_manager, storage: None, max_sequence: None, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index e2483ed4e4..6b02a92811 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -728,16 +728,19 @@ mod tests { let metadata = Arc::new(sst_region_metadata()); // Creates a source that can return an error to abort the writer. - let source = FlatSource::Iter(Box::new( + let record_batch = new_record_batch_by_range(&["a", "d"], 0, 60); + let schema = record_batch.schema(); + let iter = Box::new( [ - Ok(new_record_batch_by_range(&["a", "d"], 0, 60)), + Ok(record_batch), InvalidBatchSnafu { reason: "Abort the writer", } .fail(), ] .into_iter(), - )); + ); + let source = FlatSource::new_iter(schema, iter); // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 296d9ce2b1..a9e2d0668e 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -59,7 +59,7 @@ use crate::error::{ RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; -use crate::read::BoxedRecordBatchStream; +use crate::read::FlatSource; use crate::read::flat_projection::FlatProjectionMapper; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; @@ -993,12 +993,16 @@ struct CompactionSstReaderBuilder<'a> { impl CompactionSstReaderBuilder<'_> { /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction. - async fn build_flat_sst_reader(self) -> Result { + async fn build_flat_sst_reader(self) -> Result { let scan_input = self.build_scan_input()?.with_compaction(true); + let schema = scan_input.mapper.output_schema(); + let schema = schema.arrow_schema(); + SeqScan::new(scan_input) .build_flat_reader_for_compaction() .await + .map(|stream| FlatSource::new_stream(schema.clone(), stream)) } fn build_scan_input(self) -> Result { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 59a8a10077..e5dae0af0c 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -43,7 +43,6 @@ use crate::error::{ }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::read::FlatSource; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; @@ -362,8 +361,7 @@ impl SstMerger for DefaultSstMerger { time_range: output.output_time_range, merge_mode, }; - let reader = builder.build_flat_sst_reader().await?; - let source = FlatSource::Stream(reader); + let source = builder.build_flat_sst_reader().await?; let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index c5ad2276eb..c863d15d08 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -761,7 +761,7 @@ fn memtable_flat_sources( ); flat_sources .sources - .push((FlatSource::Iter(iter), max_sequence)); + .push((FlatSource::new_iter(schema, iter), max_sequence)); }; } else { let min_flush_rows = *ENCODE_ROW_THRESHOLD; @@ -824,9 +824,10 @@ fn memtable_flat_sources( std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; - flat_sources - .sources - .push((FlatSource::Iter(maybe_dedup), max_sequence)); + flat_sources.sources.push(( + FlatSource::new_iter(schema.clone(), maybe_dedup), + max_sequence, + )); last_iter_rows = 0; current_ranges.clear(); } @@ -857,7 +858,7 @@ fn memtable_flat_sources( flat_sources .sources - .push((FlatSource::Iter(maybe_dedup), max_sequence)); + .push((FlatSource::new_iter(schema, maybe_dedup), max_sequence)); } } @@ -1530,14 +1531,10 @@ mod tests { // Consume the iterator and count rows let mut total_rows = 0usize; for (source, _sequence) in flat_sources.sources { - match source { - crate::read::FlatSource::Iter(iter) => { - for rb in iter { - total_rows += rb.unwrap().num_rows(); - } - } - crate::read::FlatSource::Stream(_) => unreachable!(), - } + total_rows += source + .take_iter() + .map(|x| x.unwrap().num_rows()) + .sum::(); } assert_eq!(1, total_rows, "dedup should keep a single row"); } @@ -1560,14 +1557,10 @@ mod tests { let mut total_rows = 0usize; for (source, _sequence) in flat_sources.sources { - match source { - crate::read::FlatSource::Iter(iter) => { - for rb in iter { - total_rows += rb.unwrap().num_rows(); - } - } - crate::read::FlatSource::Stream(_) => unreachable!(), - } + total_rows += source + .take_iter() + .map(|x| x.unwrap().num_rows()) + .sum::(); } assert_eq!(2, total_rows, "append_mode should preserve duplicates"); } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index aaeaa9e62e..5a71bd1a4e 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -42,6 +42,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::OpType; +use arrow_schema::SchemaRef; use async_trait::async_trait; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; @@ -1105,19 +1106,59 @@ impl Source { } /// Async [RecordBatch] reader and iterator wrapper for flat format. -pub enum FlatSource { +pub struct FlatSource { + schema: SchemaRef, + inner: FlatSourceInner, +} + +impl FlatSource { + /// Create a [FlatSource] from a [BoxedRecordBatchIterator] and its schema. + pub fn new_iter(schema: SchemaRef, iter: BoxedRecordBatchIterator) -> Self { + Self { + schema, + inner: FlatSourceInner::Iter(iter), + } + } + + /// Create a [FlatSource] from a [BoxedRecordBatchStream] and its schema. + pub fn new_stream(schema: SchemaRef, stream: BoxedRecordBatchStream) -> Self { + Self { + schema, + inner: FlatSourceInner::Stream(stream), + } + } + + #[expect(unused)] + fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub async fn next_batch(&mut self) -> Result> { + self.inner.next_batch().await + } + + #[cfg(test)] + pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator { + match self.inner { + FlatSourceInner::Iter(iter) => iter, + FlatSourceInner::Stream(_) => unreachable!(), + } + } +} + +enum FlatSourceInner { /// Source from a [BoxedRecordBatchIterator]. Iter(BoxedRecordBatchIterator), /// Source from a [BoxedRecordBatchStream]. Stream(BoxedRecordBatchStream), } -impl FlatSource { +impl FlatSourceInner { /// Returns next [RecordBatch] from this data source. pub async fn next_batch(&mut self) -> Result> { match self { - FlatSource::Iter(iter) => iter.next().transpose(), - FlatSource::Stream(stream) => stream.try_next().await, + Self::Iter(iter) => iter.next().transpose(), + Self::Stream(stream) => stream.try_next().await, } } } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 4e759f50cd..56cf8d8441 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -306,8 +306,8 @@ pub fn new_record_batch_with_custom_sequence( } /// Creates a FlatSource from flat format RecordBatches. -pub fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { - FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) +pub(crate) fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { + FlatSource::new_iter(batches[0].schema(), Box::new(batches.into_iter().map(Ok))) } /// Creates a new region metadata for testing SSTs with binary datatype.