Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-02 09:33:08 +08:00
parent 7ad1489842
commit 5bd3985b8b
2 changed files with 5 additions and 5 deletions

View File

@@ -60,7 +60,7 @@ use crate::error::{
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedRecordBatchStream;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::scan_region::{PredicateGroup, ScanInput, concretize_json2_types};
use crate::read::seq_scan::SeqScan;
use crate::region::options::{MergeMode, RegionOptions};
use crate::region::version::VersionControlRef;
@@ -840,14 +840,14 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
let scan_input = self.build_scan_input()?.with_compaction(true);
let scan_input = self.build_scan_input().await?.with_compaction(true);
SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
}
fn build_scan_input(self) -> Result<ScanInput> {
async fn build_scan_input(self) -> Result<ScanInput> {
let mapper = ProjectionMapper::all(&self.metadata, true)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
@@ -867,7 +867,7 @@ impl CompactionSstReaderBuilder<'_> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
Ok(scan_input)
concretize_json2_types(scan_input).await
}
}

View File

@@ -806,7 +806,7 @@ impl ScanRegion {
}
}
async fn concretize_json2_types(input: ScanInput) -> Result<ScanInput> {
pub(crate) async fn concretize_json2_types(input: ScanInput) -> Result<ScanInput> {
let Some(output_schema) = input.mapper.as_flat().map(|x| x.output_schema()) else {
return Ok(input);
};