diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 06212cb6d5..f901300dd9 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -212,9 +212,9 @@ mod tests { use crate::compaction::window::{file_time_bucket_span, WindowedCompactionPicker}; use crate::region::options::RegionOptions; use crate::sst::file::{FileId, FileMeta, Level}; + use crate::sst::file_purger::NoopFilePurger; use crate::sst::version::SstVersion; use crate::test_util::memtable_util::metadata_for_test; - use crate::test_util::NoopFilePurger; fn build_version( files: &[(FileId, i64, i64, Level)], diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 6a09780ea0..e41a8b9c62 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -98,9 +98,12 @@ use crate::error::{ SerdeJsonSnafu, }; use crate::manifest::action::RegionEdit; +use crate::memtable::MemtableStats; use crate::metrics::HANDLE_REQUEST_ELAPSED; use crate::read::scan_region::{ScanRegion, Scanner}; +use crate::region::MitoRegionRef; use crate::request::{RegionEditRequest, WorkerRequest}; +use crate::sst::file::FileMeta; use crate::wal::entry_distributor::{ build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, }; @@ -153,17 +156,13 @@ impl MitoEngine { /// Returns the region disk/memory statistic. pub fn get_region_statistic(&self, region_id: RegionId) -> Option { - self.inner - .workers - .get_region(region_id) + self.find_region(region_id) .map(|region| region.region_statistic()) } /// Returns primary key encoding of the region. pub fn get_primary_key_encoding(&self, region_id: RegionId) -> Option { - self.inner - .workers - .get_region(region_id) + self.find_region(region_id) .map(|r| r.primary_key_encoding()) } @@ -178,14 +177,15 @@ impl MitoEngine { request: ScanRequest, ) -> Result { self.scanner(region_id, request) + .await .map_err(BoxedError::new)? .scan() .await } /// Returns a scanner to scan for `request`. - fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result { - self.scan_region(region_id, request)?.scanner() + async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result { + self.scan_region(region_id, request)?.scanner().await } /// Scans a region. @@ -225,7 +225,11 @@ impl MitoEngine { #[cfg(test)] pub(crate) fn get_region(&self, id: RegionId) -> Option { - self.inner.workers.get_region(id) + self.find_region(id) + } + + fn find_region(&self, region_id: RegionId) -> Option { + self.inner.workers.get_region(region_id) } fn encode_manifest_info_to_extensions( @@ -245,6 +249,34 @@ impl MitoEngine { ); Ok(()) } + + /// Find the current version's memtables and SSTs stats by region_id. + /// The stats must be collected in one place one time to ensure data consistency. + pub fn find_memtable_and_sst_stats( + &self, + region_id: RegionId, + ) -> Result<(Vec, Vec)> { + let region = self + .find_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + + let version = region.version(); + let memtable_stats = version + .memtables + .list_memtables() + .iter() + .map(|x| x.stats()) + .collect::>(); + + let sst_stats = version + .ssts + .levels() + .iter() + .flat_map(|level| level.files().map(|x| x.meta_ref())) + .cloned() + .collect::>(); + Ok((memtable_stats, sst_stats)) + } } /// Check whether the region edit is valid. Only adding files to region is considered valid now. @@ -336,15 +368,18 @@ impl EngineInner { self.workers.stop().await } + fn find_region(&self, region_id: RegionId) -> Result { + self.workers + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id }) + } + /// Get metadata of a region. /// /// Returns error if the region doesn't exist. fn get_metadata(&self, region_id: RegionId) -> Result { // Reading a region doesn't need to go through the region worker thread. - let region = self - .workers - .get_region(region_id) - .context(RegionNotFoundSnafu { region_id })?; + let region = self.find_region(region_id)?; Ok(region.metadata()) } @@ -451,23 +486,15 @@ impl EngineInner { fn get_last_seq_num(&self, region_id: RegionId) -> Result> { // Reading a region doesn't need to go through the region worker thread. - let region = self - .workers - .get_region(region_id) - .context(RegionNotFoundSnafu { region_id })?; - let version_ctrl = ®ion.version_control; - let seq = Some(version_ctrl.committed_sequence()); - Ok(seq) + let region = self.find_region(region_id)?; + Ok(Some(region.find_committed_sequence())) } /// Handles the scan `request` and returns a [ScanRegion]. fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. - let region = self - .workers - .get_region(region_id) - .context(RegionNotFoundSnafu { region_id })?; + let region = self.find_region(region_id)?; let version = region.version(); // Get cache. let cache_manager = self.workers.cache_manager(); @@ -489,11 +516,7 @@ impl EngineInner { /// Converts the [`RegionRole`]. fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<()> { - let region = self - .workers - .get_region(region_id) - .context(RegionNotFoundSnafu { region_id })?; - + let region = self.find_region(region_id)?; region.set_role(role); Ok(()) } @@ -609,6 +632,7 @@ impl RegionEngine for MitoEngine { self.scan_region(region_id, request) .map_err(BoxedError::new)? .region_scanner() + .await .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 6dae283061..e14dd83c40 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -42,7 +42,7 @@ use crate::test_util::{ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -459,7 +459,7 @@ async fn test_alter_on_flushing() { .unwrap(); let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -815,7 +815,7 @@ async fn test_write_stall_on_altering() { | | 2 | 2.0 | 1970-01-01T00:00:02 | +-------+-------+---------+---------------------+"; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 7d5c355c53..1374edd822 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -81,7 +81,7 @@ async fn test_append_mode_write_query() { let scan = engine .scan_region(region_id, ScanRequest::default()) .unwrap(); - let seq_scan = scan.seq_scan().unwrap(); + let seq_scan = scan.seq_scan().await.unwrap(); let stream = seq_scan.build_stream().unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); @@ -174,7 +174,10 @@ async fn test_append_mode_compaction() { | b | 1.0 | 1970-01-01T00:00:01 | +-------+---------+---------------------+"; // Scans in parallel. - let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let mut scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!(1, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); scanner.set_target_partitions(2); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 3aa9223c6e..dd4962a6b8 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -618,7 +618,7 @@ async fn test_engine_with_write_cache() { flush_region(&engine, region_id, None).await; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 03d7d12af5..c92e04c78b 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -173,7 +173,10 @@ async fn test_compaction_region() { compact(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); // Input: // [0..9] // [10...19] @@ -244,7 +247,10 @@ async fn test_infer_compaction_time_window() { compact(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!( 1, scanner.num_files(), @@ -288,7 +294,10 @@ async fn test_infer_compaction_time_window() { // this flush should update part_duration in TimePartitions. flush(&engine, region_id).await; compact(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!( 2, scanner.num_files(), @@ -317,7 +326,10 @@ async fn test_infer_compaction_time_window() { ) .await; flush(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!( 4, scanner.num_files(), @@ -365,7 +377,10 @@ async fn test_compaction_overlapping_files() { compact(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!( 1, scanner.num_files(), @@ -423,7 +438,10 @@ async fn test_compaction_region_with_overlapping() { delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600 compact(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); let stream = scanner.scan().await.unwrap(); let vec = collect_stream_ts(stream).await; assert_eq!((3600..10800).map(|i| { i * 1000 }).collect::>(), vec); @@ -469,7 +487,10 @@ async fn test_compaction_region_with_overlapping_delete_all() { delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800 compact(&engine, region_id).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!( 2, scanner.num_files(), @@ -550,7 +571,10 @@ async fn test_readonly_during_compaction() { .unwrap(); notify.notified().await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!( 2, scanner.num_files(), @@ -612,7 +636,10 @@ async fn test_compaction_update_time_window() { .compaction_time_window, Some(Duration::from_secs(3600)) ); - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!(0, scanner.num_memtables()); // We keep all 3 files because no enough file to merge assert_eq!( @@ -631,7 +658,10 @@ async fn test_compaction_update_time_window() { rows: build_rows_for_key("a", 3600, 4000, 0), }; put_rows(&engine, region_id, rows).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!(1, scanner.num_memtables()); let stream = scanner.scan().await.unwrap(); let vec = collect_stream_ts(stream).await; @@ -643,7 +673,10 @@ async fn test_compaction_update_time_window() { rows: build_rows_for_key("a", 2400, 3600, 0), }; put_rows(&engine, region_id, rows).await; - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!(2, scanner.num_memtables()); let stream = scanner.scan().await.unwrap(); let vec = collect_stream_ts(stream).await; diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index 70a842cc06..d1e2328541 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -91,11 +91,12 @@ async fn test_scan_without_filtering_deleted() { assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); // Tries to use seq scan to test it under append mode. - let scan = engine + let mut scan = engine .scan_region(region_id, ScanRequest::default()) .unwrap(); + scan.set_filter_deleted(false); - let seq_scan = scan.scan_without_filter_deleted().unwrap(); + let seq_scan = scan.seq_scan().await.unwrap(); let stream = seq_scan.build_stream().unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 1d836da733..03143118ac 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -73,7 +73,7 @@ async fn test_manual_flush() { flush_region(&engine, region_id, None).await; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -142,7 +142,7 @@ async fn test_flush_engine() { listener.wait().await; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(1, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -217,7 +217,7 @@ async fn test_write_stall() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(1, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -267,7 +267,7 @@ async fn test_flush_empty() { flush_region(&engine, region_id, None).await; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(0, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -450,7 +450,7 @@ async fn test_auto_flush_engine() { .unwrap(); let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); @@ -530,7 +530,7 @@ async fn test_flush_workers() { // Scans region 1. let request = ScanRequest::default(); - let scanner = engine.scanner(region_id1, request).unwrap(); + let scanner = engine.scanner(region_id1, request).await.unwrap(); assert_eq!(0, scanner.num_memtables()); assert_eq!(1, scanner.num_files()); let stream = scanner.scan().await.unwrap(); diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 34e2d6425f..0bc0ee4ace 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -188,7 +188,10 @@ async fn test_merge_mode_compaction() { | a | | 13.0 | 1970-01-01T00:00:03 | +-------+---------+---------+---------------------+"; // Scans in parallel. - let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let mut scanner = engine + .scanner(region_id, ScanRequest::default()) + .await + .unwrap(); assert_eq!(2, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); scanner.set_target_partitions(2); diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index b386024b40..9c424d3172 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -56,7 +56,7 @@ async fn scan_in_parallel( .unwrap(); let request = ScanRequest::default(); - let mut scanner = engine.scanner(region_id, request).unwrap(); + let mut scanner = engine.scanner(region_id, request).await.unwrap(); scanner.set_target_partitions(parallelism); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs index 1a3299e7c2..dcff07d525 100644 --- a/src/mito2/src/engine/row_selector_test.rs +++ b/src/mito2/src/engine/row_selector_test.rs @@ -95,6 +95,7 @@ async fn test_last_row(append_mode: bool) { ..Default::default() }, ) + .await .unwrap(); assert_eq!(3, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index c361c1e4ab..aee5873bb2 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -57,7 +57,7 @@ async fn test_scan_with_min_sst_sequence() { sst_min_sequence: file_min_sequence, ..Default::default() }; - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(scanner.num_files(), expected_files); let stream = scanner.scan().await.unwrap(); @@ -191,7 +191,7 @@ async fn test_series_scan() { distribution: Some(TimeSeriesDistribution::PerSeries), ..Default::default() }; - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); let Scanner::Series(mut scanner) = scanner else { panic!("Scanner should be series scan"); }; diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index c55d476bcf..842227670c 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -60,7 +60,7 @@ async fn scan_check( num_files: usize, ) { let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(num_memtable, scanner.num_memtables()); assert_eq!(num_files, scanner.num_files()); let stream = scanner.scan().await.unwrap(); diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 51fbf336a7..5278c0208a 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -189,7 +189,7 @@ async fn test_engine_truncate_after_flush() { .unwrap(); let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request.clone()).unwrap(); + let scanner = engine.scanner(region_id, request.clone()).await.unwrap(); assert_eq!(1, scanner.num_files()); // Truncate the region. @@ -206,7 +206,7 @@ async fn test_engine_truncate_after_flush() { put_rows(&engine, region_id, rows).await; // Scan the region. - let scanner = engine.scanner(region_id, request).unwrap(); + let scanner = engine.scanner(region_id, request).await.unwrap(); assert_eq!(0, scanner.num_files()); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -352,7 +352,7 @@ async fn test_engine_truncate_during_flush() { let truncated_sequence = version_data.version.flushed_sequence; let request = ScanRequest::default(); - let scanner = engine.scanner(region_id, request.clone()).unwrap(); + let scanner = engine.scanner(region_id, request.clone()).await.unwrap(); assert_eq!(0, scanner.num_files()); assert_eq!(Some(entry_id), truncated_entry_id); assert_eq!(sequence, truncated_sequence); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 96bc6538c2..7915ee09d6 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1064,6 +1064,7 @@ impl ErrorExt for Error { | NoCheckpoint { .. } | NoManifests { .. } | InstallManifestTo { .. } => StatusCode::Unexpected, + RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 1b480f19a4..41edde66d8 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -46,12 +46,12 @@ pub(crate) struct SourceIndex { /// Index to access a row group. #[derive(Debug, Clone, Copy, PartialEq)] -pub(crate) struct RowGroupIndex { +pub struct RowGroupIndex { /// Index to the memtable/file. pub(crate) index: usize, /// Row group index in the file. /// Negative index indicates all row groups. - pub(crate) row_group_index: i64, + pub row_group_index: i64, } /// Meta data of a partition range. @@ -366,7 +366,7 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec) -> Vec { /// Builder to create file ranges. #[derive(Default)] -pub(crate) struct FileRangeBuilder { +pub struct FileRangeBuilder { /// Context for the file. /// None indicates nothing to read. context: Option, @@ -385,7 +385,7 @@ impl FileRangeBuilder { /// Builds file ranges to read. /// Negative `row_group_index` indicates all row groups. - pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { + pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { let Some(context) = self.context.clone() else { return; }; @@ -485,7 +485,8 @@ impl RangeBuilderList { match builder_opt { Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges), None => { - let builder = input.prune_file(file_index, reader_metrics).await?; + let file = &input.files[file_index]; + let builder = input.prune_file(file, reader_metrics).await?; builder.build_ranges(index.row_group_index, &mut ranges); self.set_file_builder(file_index, Arc::new(builder)); } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 47590c2b08..b03e3289b4 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -32,7 +32,7 @@ use datafusion_expr::Expr; use smallvec::SmallVec; use store_api::metadata::RegionMetadata; use store_api::region_engine::{PartitionRange, RegionScannerRef}; -use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; +use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; @@ -195,6 +195,9 @@ pub(crate) struct ScanRegion { ignore_bloom_filter: bool, /// Start time of the scan task. start_time: Option, + /// Whether to filter out the deleted rows. + /// Usually true for normal read, and false for scan for compaction. + filter_deleted: bool, } impl ScanRegion { @@ -215,6 +218,7 @@ impl ScanRegion { ignore_fulltext_index: false, ignore_bloom_filter: false, start_time: None, + filter_deleted: true, } } @@ -255,55 +259,58 @@ impl ScanRegion { self } + #[cfg(test)] + pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) { + self.filter_deleted = filter_deleted; + } + /// Returns a [Scanner] to scan the region. - pub(crate) fn scanner(self) -> Result { + pub(crate) async fn scanner(self) -> Result { if self.use_series_scan() { - self.series_scan().map(Scanner::Series) + self.series_scan().await.map(Scanner::Series) } else if self.use_unordered_scan() { // If table is append only and there is no series row selector, we use unordered scan in query. // We still use seq scan in compaction. - self.unordered_scan().map(Scanner::Unordered) + self.unordered_scan().await.map(Scanner::Unordered) } else { - self.seq_scan().map(Scanner::Seq) + self.seq_scan().await.map(Scanner::Seq) } } /// Returns a [RegionScanner] to scan the region. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] - pub(crate) fn region_scanner(self) -> Result { + pub(crate) async fn region_scanner(self) -> Result { if self.use_series_scan() { - self.series_scan().map(|scanner| Box::new(scanner) as _) + self.series_scan() + .await + .map(|scanner| Box::new(scanner) as _) } else if self.use_unordered_scan() { - self.unordered_scan().map(|scanner| Box::new(scanner) as _) + self.unordered_scan() + .await + .map(|scanner| Box::new(scanner) as _) } else { - self.seq_scan().map(|scanner| Box::new(scanner) as _) + self.seq_scan().await.map(|scanner| Box::new(scanner) as _) } } /// Scan sequentially. - pub(crate) fn seq_scan(self) -> Result { - let input = self.scan_input(true)?; + pub(crate) async fn seq_scan(self) -> Result { + let input = self.scan_input().await?; Ok(SeqScan::new(input, false)) } /// Unordered scan. - pub(crate) fn unordered_scan(self) -> Result { - let input = self.scan_input(true)?; + pub(crate) async fn unordered_scan(self) -> Result { + let input = self.scan_input().await?; Ok(UnorderedScan::new(input)) } /// Scans by series. - pub(crate) fn series_scan(self) -> Result { - let input = self.scan_input(true)?; + pub(crate) async fn series_scan(self) -> Result { + let input = self.scan_input().await?; Ok(SeriesScan::new(input)) } - #[cfg(test)] - pub(crate) fn scan_without_filter_deleted(self) -> Result { - let input = self.scan_input(false)?; - Ok(SeqScan::new(input, false)) - } - /// Returns true if the region can use unordered scan for current request. fn use_unordered_scan(&self) -> bool { // We use unordered scan when: @@ -324,7 +331,7 @@ impl ScanRegion { } /// Creates a scan input. - fn scan_input(mut self, filter_deleted: bool) -> Result { + async fn scan_input(mut self) -> Result { let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new); let time_range = self.build_time_range_predicate(); @@ -368,9 +375,10 @@ impl ScanRegion { }) .collect(); + let region_id = self.region_id(); debug!( "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", - self.version.metadata.region_id, + region_id, self.request, time_range, memtables.len(), @@ -415,13 +423,17 @@ impl ScanRegion { .with_parallel_scan_channel_size(self.parallel_scan_channel_size) .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) - .with_filter_deleted(filter_deleted) + .with_filter_deleted(self.filter_deleted) .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution); Ok(input) } + fn region_id(&self) -> RegionId { + self.version.metadata.region_id + } + /// Build time range predicate from filters. fn build_time_range_predicate(&self) -> TimestampRange { let time_index = self.version.metadata.time_index_column(); @@ -538,7 +550,7 @@ impl ScanRegion { let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned(); FulltextIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), - self.version.metadata.region_id, + self.region_id(), self.access_layer.object_store().clone(), self.access_layer.puffin_manager_factory().clone(), self.version.metadata.as_ref(), @@ -566,7 +578,7 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { } /// Common input for different scanners. -pub(crate) struct ScanInput { +pub struct ScanInput { /// Region SST access layer. access_layer: AccessLayerRef, /// Maps projected Batches to RecordBatches. @@ -792,12 +804,11 @@ impl ScanInput { } /// Prunes a file to scan and returns the builder to build readers. - pub(crate) async fn prune_file( + pub async fn prune_file( &self, - file_index: usize, + file: &FileHandle, reader_metrics: &mut ReaderMetrics, ) -> Result { - let file = &self.files[file_index]; let res = self .access_layer .read_sst(file.clone()) @@ -898,9 +909,9 @@ impl ScanInput { /// Context shared by different streams from a scanner. /// It contains the input and ranges to scan. -pub(crate) struct StreamContext { +pub struct StreamContext { /// Input memtables and files. - pub(crate) input: ScanInput, + pub input: ScanInput, /// Metadata for partition ranges. pub(crate) ranges: Vec, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 4ad3ea8e2d..08863b3b2f 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState, }; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, SequenceNumber}; use crate::access_layer::AccessLayerRef; use crate::error::{ @@ -94,7 +94,7 @@ pub enum RegionRoleState { /// - Only the region worker thread this region belongs to can modify the metadata. /// - Multiple reader threads are allowed to read a specific `version` of a region. #[derive(Debug)] -pub(crate) struct MitoRegion { +pub struct MitoRegion { /// Id of this region. /// /// Accessing region id from the version control is inconvenient so @@ -135,7 +135,7 @@ pub(crate) struct MitoRegion { stats: ManifestStats, } -pub(crate) type MitoRegionRef = Arc; +pub type MitoRegionRef = Arc; impl MitoRegion { /// Stop background managers for this region. @@ -220,8 +220,16 @@ impl MitoRegion { ) } + pub fn region_id(&self) -> RegionId { + self.region_id + } + + pub fn find_committed_sequence(&self) -> SequenceNumber { + self.version_control.committed_sequence() + } + /// Returns whether the region is readonly. - pub(crate) fn is_follower(&self) -> bool { + pub fn is_follower(&self) -> bool { self.manifest_ctx.state.load() == RegionRoleState::Follower } diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 10dcd7f51e..0083372b2c 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -38,6 +38,16 @@ pub trait FilePurger: Send + Sync + fmt::Debug { pub type FilePurgerRef = Arc; +/// A no-op file purger can be used in combination with reading SST files outside of this region. +#[derive(Debug)] +pub struct NoopFilePurger; + +impl FilePurger for NoopFilePurger { + fn send_request(&self, _: PurgeRequest) { + // noop + } +} + /// Purger that purges file for current region. pub struct LocalFilePurger { scheduler: SchedulerRef, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2878a87f3e..ec25b0415a 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -823,7 +823,7 @@ impl ReaderFilterMetrics { /// Parquet reader metrics. #[derive(Debug, Default, Clone)] -pub(crate) struct ReaderMetrics { +pub struct ReaderMetrics { /// Filtered row groups and rows metrics. pub(crate) filter_metrics: ReaderFilterMetrics, /// Duration to build the parquet reader. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 7a8327c8de..0a84b7267b 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -55,6 +55,7 @@ use rskafka::client::partition::{Compression, UnknownTopicHandling}; use rskafka::client::{Client, ClientBuilder}; use rskafka::record::Record; use rstest_reuse::template; +use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::region_engine::{RegionEngine, RegionRole}; use store_api::region_request::{ @@ -71,21 +72,14 @@ use crate::error::Result; use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; -use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest}; +use crate::sst::file_purger::{FilePurger, FilePurgerRef, NoopFilePurger, PurgeRequest}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::worker::WorkerGroup; -#[derive(Debug)] -pub(crate) struct NoopFilePurger; - -impl FilePurger for NoopFilePurger { - fn send_request(&self, _request: PurgeRequest) {} -} - pub(crate) fn new_noop_file_purger() -> FilePurgerRef { - Arc::new(NoopFilePurger {}) + Arc::new(NoopFilePurger) } pub(crate) fn raft_engine_log_store_factory() -> Option { @@ -281,6 +275,31 @@ impl TestEnv { self.object_store_manager.clone() } + async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine { + async fn create( + zelf: &TestEnv, + config: MitoConfig, + log_store: Arc, + ) -> MitoEngine { + let data_home = zelf.data_home().display().to_string(); + MitoEngine::new( + &data_home, + config, + log_store, + zelf.object_store_manager.as_ref().unwrap().clone(), + zelf.schema_metadata_manager.clone(), + Plugins::new(), + ) + .await + .unwrap() + } + + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store).await, + LogStoreImpl::Kafka(log_store) => create(self, config, log_store).await, + } + } + /// Creates a new engine with specific config under this env. pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; @@ -288,58 +307,13 @@ impl TestEnv { let object_store_manager = Arc::new(object_store_manager); self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); - let data_home = self.data_home().display().to_string(); - match log_store { - LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( - &data_home, - config, - log_store, - object_store_manager, - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - LogStoreImpl::Kafka(log_store) => MitoEngine::new( - &data_home, - config, - log_store, - object_store_manager, - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - } + self.new_mito_engine(config).await } /// Creates a new engine with specific config and existing logstore and object store manager. pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { - let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); - let data_home = self.data_home().display().to_string(); - match self.log_store.as_ref().unwrap().clone() { - LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( - &data_home, - config, - log_store, - object_store_manager, - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - LogStoreImpl::Kafka(log_store) => MitoEngine::new( - &data_home, - config, - log_store, - object_store_manager, - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - } + self.new_mito_engine(config).await } /// Creates a new engine with specific config and manager/listener/purge_scheduler under this env. @@ -487,54 +461,12 @@ impl TestEnv { /// Reopen the engine. pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine { engine.stop().await.unwrap(); - match self.log_store.as_ref().unwrap().clone() { - LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( - &self.data_home().display().to_string(), - config, - log_store, - self.object_store_manager.clone().unwrap(), - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - LogStoreImpl::Kafka(log_store) => MitoEngine::new( - &self.data_home().display().to_string(), - config, - log_store, - self.object_store_manager.clone().unwrap(), - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - } + self.new_mito_engine(config).await } /// Open the engine. pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine { - match self.log_store.as_ref().unwrap().clone() { - LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( - &self.data_home().display().to_string(), - config, - log_store, - self.object_store_manager.clone().unwrap(), - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - LogStoreImpl::Kafka(log_store) => MitoEngine::new( - &self.data_home().display().to_string(), - config, - log_store, - self.object_store_manager.clone().unwrap(), - self.schema_metadata_manager.clone(), - Plugins::new(), - ) - .await - .unwrap(), - } + self.new_mito_engine(config).await } /// Only initializes the object store manager, returns the default object store. diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 3adff232a8..4e1292cc3d 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Display, Formatter}; + use common_recordbatch::OrderOption; use datafusion_expr::expr::Expr; use strum::Display; @@ -62,3 +64,42 @@ pub struct ScanRequest { /// Optional hint for the distribution of time-series data. pub distribution: Option, } + +impl Display for ScanRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ScanRequest {{")?; + if let Some(projection) = &self.projection { + write!(f, "projection: {:?},", projection)?; + } + if !self.filters.is_empty() { + write!( + f, + ", filters: [{}]", + self.filters + .iter() + .map(|f| f.to_string()) + .collect::>() + .join(", ") + )?; + } + if let Some(output_ordering) = &self.output_ordering { + write!(f, ", output_ordering: {:?}", output_ordering)?; + } + if let Some(limit) = &self.limit { + write!(f, ", limit: {}", limit)?; + } + if let Some(series_row_selector) = &self.series_row_selector { + write!(f, ", series_row_selector: {}", series_row_selector)?; + } + if let Some(sequence) = &self.sequence { + write!(f, ", sequence: {}", sequence)?; + } + if let Some(sst_min_sequence) = &self.sst_min_sequence { + write!(f, ", sst_min_sequence: {}", sst_min_sequence)?; + } + if let Some(distribution) = &self.distribution { + write!(f, ", distribution: {}", distribution)?; + } + write!(f, "}}") + } +}