refactor: make scanner creation async (#6349)

* refactor: make scanner creation async

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-06-20 14:44:49 +08:00
committed by GitHub
parent e78c3e1eaa
commit e072726ea8
22 changed files with 273 additions and 204 deletions

View File

@@ -212,9 +212,9 @@ mod tests {
use crate::compaction::window::{file_time_bucket_span, WindowedCompactionPicker};
use crate::region::options::RegionOptions;
use crate::sst::file::{FileId, FileMeta, Level};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::version::SstVersion;
use crate::test_util::memtable_util::metadata_for_test;
use crate::test_util::NoopFilePurger;
fn build_version(
files: &[(FileId, i64, i64, Level)],

View File

@@ -98,9 +98,12 @@ use crate::error::{
SerdeJsonSnafu,
};
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::region::MitoRegionRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::FileMeta;
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
};
@@ -153,17 +156,13 @@ impl MitoEngine {
/// Returns the region disk/memory statistic.
pub fn get_region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
self.inner
.workers
.get_region(region_id)
self.find_region(region_id)
.map(|region| region.region_statistic())
}
/// Returns primary key encoding of the region.
pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option<PrimaryKeyEncoding> {
self.inner
.workers
.get_region(region_id)
self.find_region(region_id)
.map(|r| r.primary_key_encoding())
}
@@ -178,14 +177,15 @@ impl MitoEngine {
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.await
.map_err(BoxedError::new)?
.scan()
.await
}
/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner()
async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner().await
}
/// Scans a region.
@@ -225,7 +225,11 @@ impl MitoEngine {
#[cfg(test)]
pub(crate) fn get_region(&self, id: RegionId) -> Option<crate::region::MitoRegionRef> {
self.inner.workers.get_region(id)
self.find_region(id)
}
fn find_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
self.inner.workers.get_region(region_id)
}
fn encode_manifest_info_to_extensions(
@@ -245,6 +249,34 @@ impl MitoEngine {
);
Ok(())
}
/// Find the current version's memtables and SSTs stats by region_id.
/// The stats must be collected in one place one time to ensure data consistency.
pub fn find_memtable_and_sst_stats(
&self,
region_id: RegionId,
) -> Result<(Vec<MemtableStats>, Vec<FileMeta>)> {
let region = self
.find_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let memtable_stats = version
.memtables
.list_memtables()
.iter()
.map(|x| x.stats())
.collect::<Vec<_>>();
let sst_stats = version
.ssts
.levels()
.iter()
.flat_map(|level| level.files().map(|x| x.meta_ref()))
.cloned()
.collect::<Vec<_>>();
Ok((memtable_stats, sst_stats))
}
}
/// Check whether the region edit is valid. Only adding files to region is considered valid now.
@@ -336,15 +368,18 @@ impl EngineInner {
self.workers.stop().await
}
fn find_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
self.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })
}
/// Get metadata of a region.
///
/// Returns error if the region doesn't exist.
fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let region = self.find_region(region_id)?;
Ok(region.metadata())
}
@@ -451,23 +486,15 @@ impl EngineInner {
fn get_last_seq_num(&self, region_id: RegionId) -> Result<Option<SequenceNumber>> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version_ctrl = &region.version_control;
let seq = Some(version_ctrl.committed_sequence());
Ok(seq)
let region = self.find_region(region_id)?;
Ok(Some(region.find_committed_sequence()))
}
/// Handles the scan `request` and returns a [ScanRegion].
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let region = self.find_region(region_id)?;
let version = region.version();
// Get cache.
let cache_manager = self.workers.cache_manager();
@@ -489,11 +516,7 @@ impl EngineInner {
/// Converts the [`RegionRole`].
fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> {
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let region = self.find_region(region_id)?;
region.set_role(role);
Ok(())
}
@@ -609,6 +632,7 @@ impl RegionEngine for MitoEngine {
self.scan_region(region_id, request)
.map_err(BoxedError::new)?
.region_scanner()
.await
.map_err(BoxedError::new)
}

View File

@@ -42,7 +42,7 @@ use crate::test_util::{
async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -459,7 +459,7 @@ async fn test_alter_on_flushing() {
.unwrap();
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -815,7 +815,7 @@ async fn test_write_stall_on_altering() {
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());

View File

@@ -81,7 +81,7 @@ async fn test_append_mode_write_query() {
let scan = engine
.scan_region(region_id, ScanRequest::default())
.unwrap();
let seq_scan = scan.seq_scan().unwrap();
let seq_scan = scan.seq_scan().await.unwrap();
let stream = seq_scan.build_stream().unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
@@ -174,7 +174,10 @@ async fn test_append_mode_compaction() {
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);

View File

@@ -618,7 +618,7 @@ async fn test_engine_with_write_cache() {
flush_region(&engine, region_id, None).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -173,7 +173,10 @@ async fn test_compaction_region() {
compact(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
// Input:
// [0..9]
// [10...19]
@@ -244,7 +247,10 @@ async fn test_infer_compaction_time_window() {
compact(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(
1,
scanner.num_files(),
@@ -288,7 +294,10 @@ async fn test_infer_compaction_time_window() {
// this flush should update part_duration in TimePartitions.
flush(&engine, region_id).await;
compact(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(
2,
scanner.num_files(),
@@ -317,7 +326,10 @@ async fn test_infer_compaction_time_window() {
)
.await;
flush(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(
4,
scanner.num_files(),
@@ -365,7 +377,10 @@ async fn test_compaction_overlapping_files() {
compact(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(
1,
scanner.num_files(),
@@ -423,7 +438,10 @@ async fn test_compaction_region_with_overlapping() {
delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600
compact(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((3600..10800).map(|i| { i * 1000 }).collect::<Vec<_>>(), vec);
@@ -469,7 +487,10 @@ async fn test_compaction_region_with_overlapping_delete_all() {
delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800
compact(&engine, region_id).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(
2,
scanner.num_files(),
@@ -550,7 +571,10 @@ async fn test_readonly_during_compaction() {
.unwrap();
notify.notified().await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(
2,
scanner.num_files(),
@@ -612,7 +636,10 @@ async fn test_compaction_update_time_window() {
.compaction_time_window,
Some(Duration::from_secs(3600))
);
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(0, scanner.num_memtables());
// We keep all 3 files because no enough file to merge
assert_eq!(
@@ -631,7 +658,10 @@ async fn test_compaction_update_time_window() {
rows: build_rows_for_key("a", 3600, 4000, 0),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(1, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
@@ -643,7 +673,10 @@ async fn test_compaction_update_time_window() {
rows: build_rows_for_key("a", 2400, 3600, 0),
};
put_rows(&engine, region_id, rows).await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(2, scanner.num_memtables());
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;

View File

@@ -91,11 +91,12 @@ async fn test_scan_without_filtering_deleted() {
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Tries to use seq scan to test it under append mode.
let scan = engine
let mut scan = engine
.scan_region(region_id, ScanRequest::default())
.unwrap();
scan.set_filter_deleted(false);
let seq_scan = scan.scan_without_filter_deleted().unwrap();
let seq_scan = scan.seq_scan().await.unwrap();
let stream = seq_scan.build_stream().unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -73,7 +73,7 @@ async fn test_manual_flush() {
flush_region(&engine, region_id, None).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -142,7 +142,7 @@ async fn test_flush_engine() {
listener.wait().await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -217,7 +217,7 @@ async fn test_write_stall() {
put_rows(&engine, region_id, rows).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -267,7 +267,7 @@ async fn test_flush_empty() {
flush_region(&engine, region_id, None).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -450,7 +450,7 @@ async fn test_auto_flush_engine() {
.unwrap();
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
@@ -530,7 +530,7 @@ async fn test_flush_workers() {
// Scans region 1.
let request = ScanRequest::default();
let scanner = engine.scanner(region_id1, request).unwrap();
let scanner = engine.scanner(region_id1, request).await.unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();

View File

@@ -188,7 +188,10 @@ async fn test_merge_mode_compaction() {
| a | | 13.0 | 1970-01-01T00:00:03 |
+-------+---------+---------+---------------------+";
// Scans in parallel.
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);

View File

@@ -56,7 +56,7 @@ async fn scan_in_parallel(
.unwrap();
let request = ScanRequest::default();
let mut scanner = engine.scanner(region_id, request).unwrap();
let mut scanner = engine.scanner(region_id, request).await.unwrap();
scanner.set_target_partitions(parallelism);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -95,6 +95,7 @@ async fn test_last_row(append_mode: bool) {
..Default::default()
},
)
.await
.unwrap();
assert_eq!(3, scanner.num_files());
assert_eq!(1, scanner.num_memtables());

View File

@@ -57,7 +57,7 @@ async fn test_scan_with_min_sst_sequence() {
sst_min_sequence: file_min_sequence,
..Default::default()
};
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(scanner.num_files(), expected_files);
let stream = scanner.scan().await.unwrap();
@@ -191,7 +191,7 @@ async fn test_series_scan() {
distribution: Some(TimeSeriesDistribution::PerSeries),
..Default::default()
};
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Series(mut scanner) = scanner else {
panic!("Scanner should be series scan");
};

View File

@@ -60,7 +60,7 @@ async fn scan_check(
num_files: usize,
) {
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(num_memtable, scanner.num_memtables());
assert_eq!(num_files, scanner.num_files());
let stream = scanner.scan().await.unwrap();

View File

@@ -189,7 +189,7 @@ async fn test_engine_truncate_after_flush() {
.unwrap();
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request.clone()).unwrap();
let scanner = engine.scanner(region_id, request.clone()).await.unwrap();
assert_eq!(1, scanner.num_files());
// Truncate the region.
@@ -206,7 +206,7 @@ async fn test_engine_truncate_after_flush() {
put_rows(&engine, region_id, rows).await;
// Scan the region.
let scanner = engine.scanner(region_id, request).unwrap();
let scanner = engine.scanner(region_id, request).await.unwrap();
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -352,7 +352,7 @@ async fn test_engine_truncate_during_flush() {
let truncated_sequence = version_data.version.flushed_sequence;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request.clone()).unwrap();
let scanner = engine.scanner(region_id, request.clone()).await.unwrap();
assert_eq!(0, scanner.num_files());
assert_eq!(Some(entry_id), truncated_entry_id);
assert_eq!(sequence, truncated_sequence);

View File

@@ -1064,6 +1064,7 @@ impl ErrorExt for Error {
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }

View File

@@ -46,12 +46,12 @@ pub(crate) struct SourceIndex {
/// Index to access a row group.
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
pub struct RowGroupIndex {
/// Index to the memtable/file.
pub(crate) index: usize,
/// Row group index in the file.
/// Negative index indicates all row groups.
pub(crate) row_group_index: i64,
pub row_group_index: i64,
}
/// Meta data of a partition range.
@@ -366,7 +366,7 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
/// Builder to create file ranges.
#[derive(Default)]
pub(crate) struct FileRangeBuilder {
pub struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
@@ -385,7 +385,7 @@ impl FileRangeBuilder {
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
let Some(context) = self.context.clone() else {
return;
};
@@ -485,7 +485,8 @@ impl RangeBuilderList {
match builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
let file = &input.files[file_index];
let builder = input.prune_file(file, reader_metrics).await?;
builder.build_ranges(index.row_group_index, &mut ranges);
self.set_file_builder(file_index, Arc::new(builder));
}

View File

@@ -32,7 +32,7 @@ use datafusion_expr::Expr;
use smallvec::SmallVec;
use store_api::metadata::RegionMetadata;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
@@ -195,6 +195,9 @@ pub(crate) struct ScanRegion {
ignore_bloom_filter: bool,
/// Start time of the scan task.
start_time: Option<Instant>,
/// Whether to filter out the deleted rows.
/// Usually true for normal read, and false for scan for compaction.
filter_deleted: bool,
}
impl ScanRegion {
@@ -215,6 +218,7 @@ impl ScanRegion {
ignore_fulltext_index: false,
ignore_bloom_filter: false,
start_time: None,
filter_deleted: true,
}
}
@@ -255,55 +259,58 @@ impl ScanRegion {
self
}
#[cfg(test)]
pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
self.filter_deleted = filter_deleted;
}
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
pub(crate) async fn scanner(self) -> Result<Scanner> {
if self.use_series_scan() {
self.series_scan().map(Scanner::Series)
self.series_scan().await.map(Scanner::Series)
} else if self.use_unordered_scan() {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.unordered_scan().map(Scanner::Unordered)
self.unordered_scan().await.map(Scanner::Unordered)
} else {
self.seq_scan().map(Scanner::Seq)
self.seq_scan().await.map(Scanner::Seq)
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
if self.use_series_scan() {
self.series_scan().map(|scanner| Box::new(scanner) as _)
self.series_scan()
.await
.map(|scanner| Box::new(scanner) as _)
} else if self.use_unordered_scan() {
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
self.unordered_scan()
.await
.map(|scanner| Box::new(scanner) as _)
} else {
self.seq_scan().map(|scanner| Box::new(scanner) as _)
self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
}
}
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input().await?;
Ok(SeqScan::new(input, false))
}
/// Unordered scan.
pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input(true)?;
pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input().await?;
Ok(UnorderedScan::new(input))
}
/// Scans by series.
pub(crate) fn series_scan(self) -> Result<SeriesScan> {
let input = self.scan_input(true)?;
pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
let input = self.scan_input().await?;
Ok(SeriesScan::new(input))
}
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
Ok(SeqScan::new(input, false))
}
/// Returns true if the region can use unordered scan for current request.
fn use_unordered_scan(&self) -> bool {
// We use unordered scan when:
@@ -324,7 +331,7 @@ impl ScanRegion {
}
/// Creates a scan input.
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
async fn scan_input(mut self) -> Result<ScanInput> {
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
let time_range = self.build_time_range_predicate();
@@ -368,9 +375,10 @@ impl ScanRegion {
})
.collect();
let region_id = self.region_id();
debug!(
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
self.version.metadata.region_id,
region_id,
self.request,
time_range,
memtables.len(),
@@ -415,13 +423,17 @@ impl ScanRegion {
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
.with_filter_deleted(self.filter_deleted)
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution);
Ok(input)
}
fn region_id(&self) -> RegionId {
self.version.metadata.region_id
}
/// Build time range predicate from filters.
fn build_time_range_predicate(&self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
@@ -538,7 +550,7 @@ impl ScanRegion {
let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
FulltextIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.version.metadata.region_id,
self.region_id(),
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
self.version.metadata.as_ref(),
@@ -566,7 +578,7 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
}
/// Common input for different scanners.
pub(crate) struct ScanInput {
pub struct ScanInput {
/// Region SST access layer.
access_layer: AccessLayerRef,
/// Maps projected Batches to RecordBatches.
@@ -792,12 +804,11 @@ impl ScanInput {
}
/// Prunes a file to scan and returns the builder to build readers.
pub(crate) async fn prune_file(
pub async fn prune_file(
&self,
file_index: usize,
file: &FileHandle,
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let file = &self.files[file_index];
let res = self
.access_layer
.read_sst(file.clone())
@@ -898,9 +909,9 @@ impl ScanInput {
/// Context shared by different streams from a scanner.
/// It contains the input and ranges to scan.
pub(crate) struct StreamContext {
pub struct StreamContext {
/// Input memtables and files.
pub(crate) input: ScanInput,
pub input: ScanInput,
/// Metadata for partition ranges.
pub(crate) ranges: Vec<RangeMeta>,

View File

@@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, SequenceNumber};
use crate::access_layer::AccessLayerRef;
use crate::error::{
@@ -94,7 +94,7 @@ pub enum RegionRoleState {
/// - Only the region worker thread this region belongs to can modify the metadata.
/// - Multiple reader threads are allowed to read a specific `version` of a region.
#[derive(Debug)]
pub(crate) struct MitoRegion {
pub struct MitoRegion {
/// Id of this region.
///
/// Accessing region id from the version control is inconvenient so
@@ -135,7 +135,7 @@ pub(crate) struct MitoRegion {
stats: ManifestStats,
}
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
pub type MitoRegionRef = Arc<MitoRegion>;
impl MitoRegion {
/// Stop background managers for this region.
@@ -220,8 +220,16 @@ impl MitoRegion {
)
}
pub fn region_id(&self) -> RegionId {
self.region_id
}
pub fn find_committed_sequence(&self) -> SequenceNumber {
self.version_control.committed_sequence()
}
/// Returns whether the region is readonly.
pub(crate) fn is_follower(&self) -> bool {
pub fn is_follower(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Follower
}

View File

@@ -38,6 +38,16 @@ pub trait FilePurger: Send + Sync + fmt::Debug {
pub type FilePurgerRef = Arc<dyn FilePurger>;
/// A no-op file purger can be used in combination with reading SST files outside of this region.
#[derive(Debug)]
pub struct NoopFilePurger;
impl FilePurger for NoopFilePurger {
fn send_request(&self, _: PurgeRequest) {
// noop
}
}
/// Purger that purges file for current region.
pub struct LocalFilePurger {
scheduler: SchedulerRef,

View File

@@ -823,7 +823,7 @@ impl ReaderFilterMetrics {
/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderMetrics {
pub struct ReaderMetrics {
/// Filtered row groups and rows metrics.
pub(crate) filter_metrics: ReaderFilterMetrics,
/// Duration to build the parquet reader.

View File

@@ -55,6 +55,7 @@ use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
use rskafka::record::Record;
use rstest_reuse::template;
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
@@ -71,21 +72,14 @@ use crate::error::Result;
use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, NoopFilePurger, PurgeRequest};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::worker::WorkerGroup;
#[derive(Debug)]
pub(crate) struct NoopFilePurger;
impl FilePurger for NoopFilePurger {
fn send_request(&self, _request: PurgeRequest) {}
}
pub(crate) fn new_noop_file_purger() -> FilePurgerRef {
Arc::new(NoopFilePurger {})
Arc::new(NoopFilePurger)
}
pub(crate) fn raft_engine_log_store_factory() -> Option<LogStoreFactory> {
@@ -281,6 +275,31 @@ impl TestEnv {
self.object_store_manager.clone()
}
async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine {
async fn create<S: LogStore>(
zelf: &TestEnv,
config: MitoConfig,
log_store: Arc<S>,
) -> MitoEngine {
let data_home = zelf.data_home().display().to_string();
MitoEngine::new(
&data_home,
config,
log_store,
zelf.object_store_manager.as_ref().unwrap().clone(),
zelf.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap()
}
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store).await,
LogStoreImpl::Kafka(log_store) => create(self, config, log_store).await,
}
}
/// Creates a new engine with specific config under this env.
pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
@@ -288,58 +307,13 @@ impl TestEnv {
let object_store_manager = Arc::new(object_store_manager);
self.log_store = Some(log_store.clone());
self.object_store_manager = Some(object_store_manager.clone());
let data_home = self.data_home().display().to_string();
match log_store {
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
}
self.new_mito_engine(config).await
}
/// Creates a new engine with specific config and existing logstore and object store manager.
pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine {
let object_store_manager = self.object_store_manager.as_ref().unwrap().clone();
let data_home = self.data_home().display().to_string();
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => MitoEngine::new(
&data_home,
config,
log_store,
object_store_manager,
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
}
self.new_mito_engine(config).await
}
/// Creates a new engine with specific config and manager/listener/purge_scheduler under this env.
@@ -487,54 +461,12 @@ impl TestEnv {
/// Reopen the engine.
pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine {
engine.stop().await.unwrap();
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&self.data_home().display().to_string(),
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => MitoEngine::new(
&self.data_home().display().to_string(),
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
}
self.new_mito_engine(config).await
}
/// Open the engine.
pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine {
match self.log_store.as_ref().unwrap().clone() {
LogStoreImpl::RaftEngine(log_store) => MitoEngine::new(
&self.data_home().display().to_string(),
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
LogStoreImpl::Kafka(log_store) => MitoEngine::new(
&self.data_home().display().to_string(),
config,
log_store,
self.object_store_manager.clone().unwrap(),
self.schema_metadata_manager.clone(),
Plugins::new(),
)
.await
.unwrap(),
}
self.new_mito_engine(config).await
}
/// Only initializes the object store manager, returns the default object store.

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use common_recordbatch::OrderOption;
use datafusion_expr::expr::Expr;
use strum::Display;
@@ -62,3 +64,42 @@ pub struct ScanRequest {
/// Optional hint for the distribution of time-series data.
pub distribution: Option<TimeSeriesDistribution>,
}
impl Display for ScanRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ScanRequest {{")?;
if let Some(projection) = &self.projection {
write!(f, "projection: {:?},", projection)?;
}
if !self.filters.is_empty() {
write!(
f,
", filters: [{}]",
self.filters
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(", ")
)?;
}
if let Some(output_ordering) = &self.output_ordering {
write!(f, ", output_ordering: {:?}", output_ordering)?;
}
if let Some(limit) = &self.limit {
write!(f, ", limit: {}", limit)?;
}
if let Some(series_row_selector) = &self.series_row_selector {
write!(f, ", series_row_selector: {}", series_row_selector)?;
}
if let Some(sequence) = &self.sequence {
write!(f, ", sequence: {}", sequence)?;
}
if let Some(sst_min_sequence) = &self.sst_min_sequence {
write!(f, ", sst_min_sequence: {}", sst_min_sequence)?;
}
if let Some(distribution) = &self.distribution {
write!(f, ", distribution: {}", distribution)?;
}
write!(f, "}}")
}
}