refactor: store the schema of flat source

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-09 19:12:10 +08:00
parent 7a285c2890
commit bafe4576e8
7 changed files with 77 additions and 36 deletions

View File

@@ -244,10 +244,12 @@ impl ObjbenchCommand {
..Default::default()
};
let source =
FlatSource::new_stream(region_meta.schema.arrow_schema().clone(), reader_stream);
let write_req = SstWriteRequest {
op_type: OperationType::Flush,
metadata: region_meta,
source: FlatSource::Stream(reader_stream),
source,
cache_manager,
storage: None,
max_sequence: None,

View File

@@ -728,16 +728,19 @@ mod tests {
let metadata = Arc::new(sst_region_metadata());
// Creates a source that can return an error to abort the writer.
let source = FlatSource::Iter(Box::new(
let record_batch = new_record_batch_by_range(&["a", "d"], 0, 60);
let schema = record_batch.schema();
let iter = Box::new(
[
Ok(new_record_batch_by_range(&["a", "d"], 0, 60)),
Ok(record_batch),
InvalidBatchSnafu {
reason: "Abort the writer",
}
.fail(),
]
.into_iter(),
));
);
let source = FlatSource::new_iter(schema, iter);
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {

View File

@@ -59,7 +59,7 @@ use crate::error::{
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedRecordBatchStream;
use crate::read::FlatSource;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
@@ -993,12 +993,16 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
async fn build_flat_sst_reader(self) -> Result<FlatSource> {
let scan_input = self.build_scan_input()?.with_compaction(true);
let schema = scan_input.mapper.output_schema();
let schema = schema.arrow_schema();
SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
.map(|stream| FlatSource::new_stream(schema.clone(), stream))
}
fn build_scan_input(self) -> Result<ScanInput> {

View File

@@ -43,7 +43,6 @@ use crate::error::{
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::FlatSource;
use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
@@ -362,8 +361,7 @@ impl SstMerger for DefaultSstMerger {
time_range: output.output_time_range,
merge_mode,
};
let reader = builder.build_flat_sst_reader().await?;
let source = FlatSource::Stream(reader);
let source = builder.build_flat_sst_reader().await?;
let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region

View File

@@ -761,7 +761,7 @@ fn memtable_flat_sources(
);
flat_sources
.sources
.push((FlatSource::Iter(iter), max_sequence));
.push((FlatSource::new_iter(schema, iter), max_sequence));
};
} else {
let min_flush_rows = *ENCODE_ROW_THRESHOLD;
@@ -824,9 +824,10 @@ fn memtable_flat_sources(
std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
)?;
flat_sources
.sources
.push((FlatSource::Iter(maybe_dedup), max_sequence));
flat_sources.sources.push((
FlatSource::new_iter(schema.clone(), maybe_dedup),
max_sequence,
));
last_iter_rows = 0;
current_ranges.clear();
}
@@ -857,7 +858,7 @@ fn memtable_flat_sources(
flat_sources
.sources
.push((FlatSource::Iter(maybe_dedup), max_sequence));
.push((FlatSource::new_iter(schema, maybe_dedup), max_sequence));
}
}
@@ -1530,14 +1531,10 @@ mod tests {
// Consume the iterator and count rows
let mut total_rows = 0usize;
for (source, _sequence) in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
total_rows += rb.unwrap().num_rows();
}
}
crate::read::FlatSource::Stream(_) => unreachable!(),
}
total_rows += source
.take_iter()
.map(|x| x.unwrap().num_rows())
.sum::<usize>();
}
assert_eq!(1, total_rows, "dedup should keep a single row");
}
@@ -1560,14 +1557,10 @@ mod tests {
let mut total_rows = 0usize;
for (source, _sequence) in flat_sources.sources {
match source {
crate::read::FlatSource::Iter(iter) => {
for rb in iter {
total_rows += rb.unwrap().num_rows();
}
}
crate::read::FlatSource::Stream(_) => unreachable!(),
}
total_rows += source
.take_iter()
.map(|x| x.unwrap().num_rows())
.sum::<usize>();
}
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
}

View File

@@ -42,6 +42,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::OpType;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use common_time::Timestamp;
use datafusion_common::arrow::array::UInt8Array;
@@ -1105,19 +1106,59 @@ impl Source {
}
/// Async [RecordBatch] reader and iterator wrapper for flat format.
pub enum FlatSource {
pub struct FlatSource {
schema: SchemaRef,
inner: FlatSourceInner,
}
impl FlatSource {
/// Create a [FlatSource] from a [BoxedRecordBatchIterator] and its schema.
pub fn new_iter(schema: SchemaRef, iter: BoxedRecordBatchIterator) -> Self {
Self {
schema,
inner: FlatSourceInner::Iter(iter),
}
}
/// Create a [FlatSource] from a [BoxedRecordBatchStream] and its schema.
pub fn new_stream(schema: SchemaRef, stream: BoxedRecordBatchStream) -> Self {
Self {
schema,
inner: FlatSourceInner::Stream(stream),
}
}
#[expect(unused)]
fn schema(&self) -> &SchemaRef {
&self.schema
}
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
self.inner.next_batch().await
}
#[cfg(test)]
pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator {
match self.inner {
FlatSourceInner::Iter(iter) => iter,
FlatSourceInner::Stream(_) => unreachable!(),
}
}
}
enum FlatSourceInner {
/// Source from a [BoxedRecordBatchIterator].
Iter(BoxedRecordBatchIterator),
/// Source from a [BoxedRecordBatchStream].
Stream(BoxedRecordBatchStream),
}
impl FlatSource {
impl FlatSourceInner {
/// Returns next [RecordBatch] from this data source.
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
match self {
FlatSource::Iter(iter) => iter.next().transpose(),
FlatSource::Stream(stream) => stream.try_next().await,
Self::Iter(iter) => iter.next().transpose(),
Self::Stream(stream) => stream.try_next().await,
}
}
}

View File

@@ -306,8 +306,8 @@ pub fn new_record_batch_with_custom_sequence(
}
/// Creates a FlatSource from flat format RecordBatches.
pub fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
pub(crate) fn new_flat_source_from_record_batches(batches: Vec<RecordBatch>) -> FlatSource {
FlatSource::new_iter(batches[0].schema(), Box::new(batches.into_iter().map(Ok)))
}
/// Creates a new region metadata for testing SSTs with binary datatype.