From 1c1963999a20ddc124efd044472c1dff0e05d3cc Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 18 Mar 2026 18:27:11 +0800 Subject: [PATCH] fix: make snapshot/inc query works Signed-off-by: discord9 --- src/datanode/src/region_server.rs | 314 ++++++++++++++++++++++++--- src/mito2/src/engine/scan_test.rs | 50 +++++ src/mito2/src/region.rs | 4 + src/session/src/context.rs | 7 + tests-integration/src/grpc/flight.rs | 46 ++-- 5 files changed, 372 insertions(+), 49 deletions(-) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 662ec43b9d..e8c11d617e 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -19,7 +19,7 @@ use std::fmt::Debug; use std::ops::Deref; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::task::{Context, Poll}; use std::time::Duration; @@ -71,9 +71,9 @@ use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{ - RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest, - RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, - SyncRegionFromRequest, + RegionEngine, RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, + RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse, + SettableRegionRoleState, SyncRegionFromRequest, }; use store_api::region_request::{ AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest, @@ -224,7 +224,6 @@ impl RegionServer { self.inner.handle_request(region_id, request).await } - /// Returns a table provider for the region. Will set snapshot sequence if available in the context. async fn table_provider( &self, region_id: RegionId, @@ -241,9 +240,21 @@ impl RegionServer { RegionNotReadySnafu { region_id } ); + let engine = status.into_engine(); + if let Some(ctx) = &ctx + && should_return_region_seq(ctx) + && ctx.get_snapshot(region_id.as_u64()).is_none() + { + let sampled_sequence = engine + .get_committed_sequence(region_id) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id })?; + bind_snapshot_bound_region_seq(ctx, region_id, sampled_sequence); + } + self.inner .table_provider_factory - .create(region_id, status.into_engine(), ctx) + .create(region_id, engine, ctx) .await .context(ExecuteLogicalPlanSnafu) } @@ -261,20 +272,6 @@ impl RegionServer { }; let region_id = RegionId::from_u64(request.region_id); - let should_return_region_seq = should_return_region_seq(&query_ctx); - let region_latest_seq = if should_return_region_seq { - let engine = self - .find_engine(region_id)? - .context(RegionNotFoundSnafu { region_id })?; - Some( - engine - .get_committed_sequence(region_id) - .await - .with_context(|_| HandleRegionRequestSnafu { region_id })?, - ) - } else { - None - }; let catalog_list = Arc::new(NameAwareCatalogList::new( self.clone(), region_id, @@ -305,13 +302,23 @@ impl RegionServer { region_id, plan, }, - query_ctx, + query_ctx.clone(), ) .await?; + let region_latest_seq = if should_return_region_seq(&query_ctx) { + query_ctx.get_snapshot(region_id.as_u64()) + } else { + None + }; + if let Some(seq) = region_latest_seq { - Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq)) - as SendableRecordBatchStream) + Ok(Box::pin(RegionWatermarkStream::new( + stream, + region_id, + seq, + self.find_engine(region_id)?, + )) as SendableRecordBatchStream) } else { Ok(stream) } @@ -782,30 +789,95 @@ fn should_return_region_seq(query_ctx: &QueryContext) -> bool { || query_ctx.extension(FLOW_INCREMENTAL_AFTER_SEQS).is_some() } +fn bind_snapshot_bound_region_seq( + query_ctx: &QueryContext, + region_id: RegionId, + sampled_sequence: u64, +) -> u64 { + if let Some(snapshot_seq) = query_ctx.get_snapshot(region_id.as_u64()) { + snapshot_seq + } else { + query_ctx.set_snapshot(region_id.as_u64(), sampled_sequence); + sampled_sequence + } +} + +/// Returns whether `snapshot_sequence` is a valid correctness watermark for this region, +/// i.e. whether this query can safely treat it as the read upper bound. +/// +/// For now this proof only covers memtable-side sequence information; SST-side proof +/// can be added later if finer sequence metadata becomes available. +fn can_prove_region_watermark_for_engine( + engine: &dyn RegionEngine, + region_id: RegionId, + snapshot_sequence: u64, +) -> bool { + if let Some(mito_engine) = engine.as_any().downcast_ref::() + && let Some(region) = mito_engine.find_region(region_id) + { + return snapshot_sequence >= region.flushed_sequence(); + } + true +} + struct RegionWatermarkStream { stream: SendableRecordBatchStream, - region_latest_sequence: (u64, u64), + region_id: u64, + snapshot_seq: u64, + proof_engine: Option, finished: AtomicBool, + proof_state: Mutex>, } impl RegionWatermarkStream { - fn new(stream: SendableRecordBatchStream, region_id: RegionId, latest_sequence: u64) -> Self { + fn new( + stream: SendableRecordBatchStream, + region_id: RegionId, + latest_sequence: u64, + proof_engine: Option, + ) -> Self { Self { stream, - region_latest_sequence: (region_id.as_u64(), latest_sequence), + region_id: region_id.as_u64(), + snapshot_seq: latest_sequence, + proof_engine, finished: AtomicBool::new(false), + proof_state: Mutex::new(None), } } + fn compute_proof(&self) -> bool { + self.proof_engine + .as_ref() + .map(|engine| { + can_prove_region_watermark_for_engine( + engine.as_ref(), + RegionId::from_u64(self.region_id), + self.snapshot_seq, + ) + }) + .unwrap_or(false) + } + + fn proof_passed(&self) -> bool { + self.proof_state.lock().unwrap().unwrap_or(false) + } + fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics { + // TODO(discord9): Move correctness-watermark proof into the mito scan + // path. The current stream-end proof is conservatively safe, but still + // allows false negatives if a late flush happens after the scan snapshot. + if !self.proof_passed() { + return metrics; + } let region_latest_sequences = metrics.region_latest_sequences.get_or_insert_with(Vec::new); if let Some((_, seq)) = region_latest_sequences .iter_mut() - .find(|(region_id, _)| *region_id == self.region_latest_sequence.0) + .find(|(region_id, _)| *region_id == self.region_id) { - *seq = self.region_latest_sequence.1; + *seq = self.snapshot_seq; } else { - region_latest_sequences.push(self.region_latest_sequence); + region_latest_sequences.push((self.region_id, self.snapshot_seq)); } metrics } @@ -840,6 +912,7 @@ impl Stream for RegionWatermarkStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(None) => { + *self.proof_state.lock().unwrap() = Some(self.compute_proof()); self.finished.store(true, Ordering::Relaxed); Poll::Ready(None) } @@ -851,16 +924,21 @@ impl Stream for RegionWatermarkStream { #[cfg(test)] mod watermark_tests { use std::collections::HashMap; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; + use api::v1::Rows; use common_recordbatch::{RecordBatch, RecordBatches}; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; use futures_util::StreamExt; + use mito2::config::MitoConfig; + use mito2::test_util::{self, CreateRequestBuilder, TestEnv}; use session::context::QueryContextBuilder; + use store_api::region_request::RegionRequest; use super::*; + use crate::tests::MockRegionEngine; #[test] fn test_should_return_region_seq() { @@ -884,6 +962,98 @@ mod watermark_tests { assert!(!should_return_region_seq(&ctx)); } + #[test] + fn test_full_query_and_incremental_queries_require_region_seq() { + let ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_INCREMENTAL_AFTER_SEQS.to_string(), + r#"{"1":10}"#.to_string(), + )])) + .build(); + assert!(should_return_region_seq(&ctx)); + + let ctx = QueryContextBuilder::default() + .extensions(HashMap::from([( + FLOW_RETURN_REGION_SEQ.to_string(), + "true".to_string(), + )])) + .build(); + assert!(should_return_region_seq(&ctx)); + + let ctx = QueryContextBuilder::default().build(); + assert!(!should_return_region_seq(&ctx)); + } + + #[test] + fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() { + let region_id = RegionId::new(42, 7); + let ctx = QueryContextBuilder::default() + .snapshot_seqs(Arc::new(RwLock::new(HashMap::from([( + region_id.as_u64(), + 42_u64, + )])))) + .build(); + + let seq = bind_snapshot_bound_region_seq(&ctx, region_id, 99); + + assert_eq!(seq, 42); + assert_eq!(ctx.get_snapshot(region_id.as_u64()), Some(42)); + } + + #[test] + fn test_bind_snapshot_bound_region_seq_sets_sampled_sequence() { + let region_id = RegionId::new(42, 7); + let ctx = QueryContextBuilder::default().build(); + + let seq = bind_snapshot_bound_region_seq(&ctx, region_id, 99); + + assert_eq!(seq, 99); + assert_eq!(ctx.get_snapshot(region_id.as_u64()), Some(99)); + } + + #[test] + fn test_can_prove_region_watermark_for_non_mito_engine_defaults_true() { + let engine = MockRegionEngine::new(MITO_ENGINE_NAME).0; + assert!(can_prove_region_watermark_for_engine( + engine.as_ref(), + RegionId::new(1, 1), + 42, + )); + } + + #[tokio::test] + async fn test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed() { + let mut env = TestEnv::with_prefix( + "test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed", + ) + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let region = engine.find_region(region_id).unwrap(); + let snapshot_seq = region.flushed_sequence(); + + assert!(can_prove_region_watermark_for_engine( + &engine, + region_id, + snapshot_seq, + )); + } + #[tokio::test] async fn test_region_watermark_stream_only_sets_terminal_metrics() { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( @@ -898,7 +1068,8 @@ mod watermark_tests { .as_stream(); let region_id = RegionId::new(42, 7); - let wrapped = RegionWatermarkStream::new(stream, region_id, 99); + let proof_engine = Some(MockRegionEngine::new(MITO_ENGINE_NAME).0 as RegionEngineRef); + let wrapped = RegionWatermarkStream::new(stream, region_id, 99, proof_engine); let mut pinned = Box::pin(wrapped); assert!(pinned.as_ref().get_ref().metrics().is_none()); @@ -910,6 +1081,87 @@ mod watermark_tests { Some(vec![(region_id.as_u64(), 99)]) ); } + + #[tokio::test] + async fn test_region_watermark_stream_without_proof_engine_omits_watermark() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2])); + let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap(); + let stream = RecordBatches::try_new(schema, vec![batch]) + .unwrap() + .as_stream(); + + let region_id = RegionId::new(42, 7); + let wrapped = RegionWatermarkStream::new(stream, region_id, 99, None); + let mut pinned = Box::pin(wrapped); + + while pinned.next().await.is_some() {} + + let metrics = pinned.as_ref().get_ref().metrics().unwrap(); + assert!(metrics.region_latest_sequences.is_none()); + } + + #[tokio::test] + async fn test_region_watermark_stream_omits_watermark_after_late_flush() { + let mut env = + TestEnv::with_prefix("test_region_watermark_stream_omits_watermark_after_late_flush") + .await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let initial_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, initial_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let snapshot_seq = engine.find_region(region_id).unwrap().flushed_sequence(); + + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "v", + ConcreteDataType::int32_datatype(), + false, + )])); + let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2])); + let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap(); + let stream = RecordBatches::try_new(schema, vec![batch]) + .unwrap() + .as_stream(); + + let wrapped = RegionWatermarkStream::new( + stream, + region_id, + snapshot_seq, + Some(Arc::new(engine.clone()) as RegionEngineRef), + ); + let mut pinned = Box::pin(wrapped); + + assert!(pinned.next().await.is_some()); + + let late_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, late_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + assert!(pinned.next().await.is_none()); + + let metrics = pinned.as_ref().get_ref().metrics().unwrap(); + assert!(metrics.region_latest_sequences.is_none()); + } } #[async_trait] diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 57594a28f1..f488d1d4c4 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -119,6 +119,56 @@ async fn test_scan_with_min_sst_sequence() { test_scan_with_min_sst_sequence_with_format(true).await; } +#[tokio::test] +async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() { + let mut env = + TestEnv::with_prefix("test_full_snapshot_upper_bound_does_not_constrain_sst_rows").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let first_rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(0, 3), + }; + test_util::put_rows(&engine, region_id, first_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let snapshot_upper_bound = engine.get_committed_sequence(region_id).await.unwrap(); + + let second_rows = Rows { + schema: column_schemas, + rows: test_util::build_rows(3, 5), + }; + test_util::put_rows(&engine, region_id, second_rows).await; + test_util::flush_region(&engine, region_id, None).await; + + let scanner = engine + .scanner( + region_id, + ScanRequest { + memtable_max_sequence: Some(snapshot_upper_bound), + ..Default::default() + }, + ) + .await + .unwrap(); + + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let pretty = batches.pretty_print().unwrap(); + + assert!(pretty.contains("1970-01-01T00:00:03")); + assert!(pretty.contains("1970-01-01T00:00:04")); +} + async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await; let engine = env diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index de8927c4de..8b7c31b0ac 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -318,6 +318,10 @@ impl MitoRegion { self.version_control.committed_sequence() } + pub fn flushed_sequence(&self) -> SequenceNumber { + self.version_control.current().version.flushed_sequence + } + /// Returns whether the region is readonly. pub fn is_follower(&self) -> bool { self.manifest_ctx.state.load() == RegionRoleState::Follower diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 56a38c9d72..5f16ea8b5a 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -441,6 +441,13 @@ impl QueryContext { self.snapshot_seqs.read().unwrap().get(®ion_id).cloned() } + pub fn set_snapshot(&self, region_id: u64, sequence: u64) { + self.snapshot_seqs + .write() + .unwrap() + .insert(region_id, sequence); + } + /// Returns `true` if the session can cast strings to numbers in MySQL style. pub fn auto_string_to_numeric(&self) -> bool { matches!(self.channel, Channel::Mysql) diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index 952ff8a5c9..3a9e2b7330 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -39,6 +39,7 @@ mod test { use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{FlightCompression, GrpcServerConfig}; use servers::server::Server; + use store_api::storage::RegionId; use crate::cluster::GreptimeDbClusterBuilder; use crate::grpc::query_and_expect; @@ -154,10 +155,13 @@ mod test { while let Some(batch) = stream.next().await { batch.unwrap(); } - assert!( - terminal_metrics - .region_watermark_map() - .is_some_and(|m| !m.is_empty()) + let regions = db.list_all_regions().await; + assert_eq!(regions.len(), 1); + let (region_id, region) = regions.into_iter().next().unwrap(); + let expected_watermark = (region_id.as_u64(), region.find_committed_sequence()); + assert_eq!( + terminal_metrics.region_watermark_map(), + Some(std::collections::HashMap::from([expected_watermark])) ); let output = client @@ -175,17 +179,13 @@ mod test { } assert_eq!(row_count, 9); - let RecordBatchMetrics { - region_latest_sequences, - .. - } = stream.metrics().expect("expected terminal metrics"); - let region_latest_sequences = - region_latest_sequences.expect("expected region watermark metrics"); - assert_eq!(region_latest_sequences.len(), 1); - assert!(region_latest_sequences[0].0 > 0); - assert!(region_latest_sequences[0].1 > 0); + let metrics = stream.metrics().expect("expected terminal metrics"); + let region_latest_sequences = metrics + .region_latest_sequences + .expect("expected region watermark metrics"); + assert_eq!(region_latest_sequences, vec![expected_watermark]); - let previous_watermark = region_latest_sequences[0]; + let previous_watermark = expected_watermark; create_table_named(&client, "bar").await; let result = client @@ -222,7 +222,10 @@ mod test { panic!("expected affected rows output"); }; assert_eq!(affected_rows, 9); - assert!(result.region_watermark_map().is_some_and(|m| !m.is_empty())); + assert_eq!( + result.region_watermark_map(), + Some(std::collections::HashMap::from([previous_watermark])) + ); let incremental_batches = create_record_batches(10); test_put_record_batches(&client, incremental_batches).await; @@ -283,9 +286,16 @@ mod test { .expect("expected terminal incremental metrics"); let region_latest_sequences = region_latest_sequences.expect("expected incremental region watermark metrics"); - assert_eq!(region_latest_sequences.len(), 1); - assert_eq!(region_latest_sequences[0].0, previous_watermark.0); - assert!(region_latest_sequences[0].1 > previous_watermark.1); + let regions = db.list_all_regions().await; + let region = regions + .get(&RegionId::from_u64(previous_watermark.0)) + .expect("expected source region to exist"); + let expected_incremental_watermark = + (previous_watermark.0, region.find_committed_sequence()); + assert_eq!( + region_latest_sequences, + vec![expected_incremental_watermark] + ); client.sql("admin flush_table('foo')").await.unwrap();