diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 1af79daff6..6f0a2c8c6f 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -129,8 +129,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::MitoConfig; use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin}; use crate::error::{ - InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result, - SerdeJsonSnafu, SerializeColumnMetadataSnafu, + IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, + RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, }; #[cfg(feature = "enterprise")] use crate::extension::BoxedExtensionRangeProviderFactory; @@ -1017,6 +1017,19 @@ impl EngineInner { // Reading a region doesn't need to go through the region worker thread. let region = self.find_region(region_id)?; let version = region.version(); + + if let Some(given_seq) = request.memtable_min_sequence { + let min_readable_seq = version.flushed_sequence; + ensure!( + given_seq >= min_readable_seq, + IncrementalQueryStaleSnafu { + region_id, + given_seq, + min_readable_seq, + } + ); + } + // Get cache. let cache_manager = self.workers.cache_manager(); diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 46f4cc6cf2..6adaadbc59 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -23,10 +23,96 @@ use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution}; use crate::config::MitoConfig; +use crate::error::Error; use crate::read::scan_region::Scanner; use crate::test_util; use crate::test_util::{CreateRequestBuilder, TestEnv}; +#[tokio::test] +async fn test_incremental_query_stale_error() { + let mut env = TestEnv::with_prefix("test_incremental_query_stale_error").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let err = engine + .scanner( + region_id, + ScanRequest { + memtable_min_sequence: Some(0), + ..Default::default() + }, + ) + .await + .err() + .expect("expect stale incremental error"); + + let min_readable_seq = match &err { + Error::IncrementalQueryStale { + region_id: err_region_id, + given_seq, + min_readable_seq, + .. + } => { + assert_eq!(*err_region_id, region_id); + assert_eq!(*given_seq, 0); + assert!(*min_readable_seq > 0); + *min_readable_seq + } + _ => panic!("unexpected err: {err}"), + }; + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + let err_msg = err.to_string(); + assert!(err_msg.contains("STALE_CURSOR")); + assert!(err_msg.contains(®ion_id.to_string())); + assert!(err_msg.contains("given_seq: 0")); + assert!(err_msg.contains(&format!("min_readable_seq: {min_readable_seq}"))); + + let incremental_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, incremental_rows).await; + + let scanner = engine + .scanner( + region_id, + ScanRequest { + memtable_min_sequence: Some(min_readable_seq), + sst_min_sequence: Some(u64::MAX), + ..Default::default() + }, + ) + .await + .unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + batches.pretty_print().unwrap(), + "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+" + ); +} + #[tokio::test] async fn test_scan_with_min_sst_sequence() { test_scan_with_min_sst_sequence_with_format(false).await; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 923d8a2713..7bb63c864a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -228,6 +228,20 @@ pub enum Error { location: Location, }, + #[snafu(display( + "STALE_CURSOR: incremental query stale, region: {}, given_seq: {}, min_readable_seq: {}, retry_hint: FALLBACK_FULL_RECOMPUTE", + region_id, + given_seq, + min_readable_seq + ))] + IncrementalQueryStale { + region_id: RegionId, + given_seq: u64, + min_readable_seq: u64, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Old manifest missing for region {}", region_id))] MissingOldManifest { region_id: RegionId, @@ -1286,6 +1300,7 @@ impl ErrorExt for Error { | InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidRequest { .. } + | IncrementalQueryStale { .. } | PartitionExprVersionMismatch { .. } | FillDefault { .. } | ConvertColumnDataType { .. }