fix: make snapshot/inc query works

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-18 18:27:11 +08:00
parent 8376150c81
commit 1c1963999a
5 changed files with 372 additions and 49 deletions

View File

@@ -19,7 +19,7 @@ use std::fmt::Debug;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;
@@ -71,9 +71,9 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest,
RegionEngine, RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncRegionFromRequest,
};
use store_api::region_request::{
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
@@ -224,7 +224,6 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await
}
/// Returns a table provider for the region. Will set snapshot sequence if available in the context.
async fn table_provider(
&self,
region_id: RegionId,
@@ -241,9 +240,21 @@ impl RegionServer {
RegionNotReadySnafu { region_id }
);
let engine = status.into_engine();
if let Some(ctx) = &ctx
&& should_return_region_seq(ctx)
&& ctx.get_snapshot(region_id.as_u64()).is_none()
{
let sampled_sequence = engine
.get_committed_sequence(region_id)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?;
bind_snapshot_bound_region_seq(ctx, region_id, sampled_sequence);
}
self.inner
.table_provider_factory
.create(region_id, status.into_engine(), ctx)
.create(region_id, engine, ctx)
.await
.context(ExecuteLogicalPlanSnafu)
}
@@ -261,20 +272,6 @@ impl RegionServer {
};
let region_id = RegionId::from_u64(request.region_id);
let should_return_region_seq = should_return_region_seq(&query_ctx);
let region_latest_seq = if should_return_region_seq {
let engine = self
.find_engine(region_id)?
.context(RegionNotFoundSnafu { region_id })?;
Some(
engine
.get_committed_sequence(region_id)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?,
)
} else {
None
};
let catalog_list = Arc::new(NameAwareCatalogList::new(
self.clone(),
region_id,
@@ -305,13 +302,23 @@ impl RegionServer {
region_id,
plan,
},
query_ctx,
query_ctx.clone(),
)
.await?;
let region_latest_seq = if should_return_region_seq(&query_ctx) {
query_ctx.get_snapshot(region_id.as_u64())
} else {
None
};
if let Some(seq) = region_latest_seq {
Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
as SendableRecordBatchStream)
Ok(Box::pin(RegionWatermarkStream::new(
stream,
region_id,
seq,
self.find_engine(region_id)?,
)) as SendableRecordBatchStream)
} else {
Ok(stream)
}
@@ -782,30 +789,95 @@ fn should_return_region_seq(query_ctx: &QueryContext) -> bool {
|| query_ctx.extension(FLOW_INCREMENTAL_AFTER_SEQS).is_some()
}
fn bind_snapshot_bound_region_seq(
query_ctx: &QueryContext,
region_id: RegionId,
sampled_sequence: u64,
) -> u64 {
if let Some(snapshot_seq) = query_ctx.get_snapshot(region_id.as_u64()) {
snapshot_seq
} else {
query_ctx.set_snapshot(region_id.as_u64(), sampled_sequence);
sampled_sequence
}
}
/// Returns whether `snapshot_sequence` is a valid correctness watermark for this region,
/// i.e. whether this query can safely treat it as the read upper bound.
///
/// For now this proof only covers memtable-side sequence information; SST-side proof
/// can be added later if finer sequence metadata becomes available.
fn can_prove_region_watermark_for_engine(
engine: &dyn RegionEngine,
region_id: RegionId,
snapshot_sequence: u64,
) -> bool {
if let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
&& let Some(region) = mito_engine.find_region(region_id)
{
return snapshot_sequence >= region.flushed_sequence();
}
true
}
struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
region_latest_sequence: (u64, u64),
region_id: u64,
snapshot_seq: u64,
proof_engine: Option<RegionEngineRef>,
finished: AtomicBool,
proof_state: Mutex<Option<bool>>,
}
impl RegionWatermarkStream {
fn new(stream: SendableRecordBatchStream, region_id: RegionId, latest_sequence: u64) -> Self {
fn new(
stream: SendableRecordBatchStream,
region_id: RegionId,
latest_sequence: u64,
proof_engine: Option<RegionEngineRef>,
) -> Self {
Self {
stream,
region_latest_sequence: (region_id.as_u64(), latest_sequence),
region_id: region_id.as_u64(),
snapshot_seq: latest_sequence,
proof_engine,
finished: AtomicBool::new(false),
proof_state: Mutex::new(None),
}
}
fn compute_proof(&self) -> bool {
self.proof_engine
.as_ref()
.map(|engine| {
can_prove_region_watermark_for_engine(
engine.as_ref(),
RegionId::from_u64(self.region_id),
self.snapshot_seq,
)
})
.unwrap_or(false)
}
fn proof_passed(&self) -> bool {
self.proof_state.lock().unwrap().unwrap_or(false)
}
fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
// TODO(discord9): Move correctness-watermark proof into the mito scan
// path. The current stream-end proof is conservatively safe, but still
// allows false negatives if a late flush happens after the scan snapshot.
if !self.proof_passed() {
return metrics;
}
let region_latest_sequences = metrics.region_latest_sequences.get_or_insert_with(Vec::new);
if let Some((_, seq)) = region_latest_sequences
.iter_mut()
.find(|(region_id, _)| *region_id == self.region_latest_sequence.0)
.find(|(region_id, _)| *region_id == self.region_id)
{
*seq = self.region_latest_sequence.1;
*seq = self.snapshot_seq;
} else {
region_latest_sequences.push(self.region_latest_sequence);
region_latest_sequences.push((self.region_id, self.snapshot_seq));
}
metrics
}
@@ -840,6 +912,7 @@ impl Stream for RegionWatermarkStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
*self.proof_state.lock().unwrap() = Some(self.compute_proof());
self.finished.store(true, Ordering::Relaxed);
Poll::Ready(None)
}
@@ -851,16 +924,21 @@ impl Stream for RegionWatermarkStream {
#[cfg(test)]
mod watermark_tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use api::v1::Rows;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use mito2::config::MitoConfig;
use mito2::test_util::{self, CreateRequestBuilder, TestEnv};
use session::context::QueryContextBuilder;
use store_api::region_request::RegionRequest;
use super::*;
use crate::tests::MockRegionEngine;
#[test]
fn test_should_return_region_seq() {
@@ -884,6 +962,98 @@ mod watermark_tests {
assert!(!should_return_region_seq(&ctx));
}
#[test]
fn test_full_query_and_incremental_queries_require_region_seq() {
let ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"1":10}"#.to_string(),
)]))
.build();
assert!(should_return_region_seq(&ctx));
let ctx = QueryContextBuilder::default()
.extensions(HashMap::from([(
FLOW_RETURN_REGION_SEQ.to_string(),
"true".to_string(),
)]))
.build();
assert!(should_return_region_seq(&ctx));
let ctx = QueryContextBuilder::default().build();
assert!(!should_return_region_seq(&ctx));
}
#[test]
fn test_bind_snapshot_bound_region_seq_reuses_existing_snapshot() {
let region_id = RegionId::new(42, 7);
let ctx = QueryContextBuilder::default()
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(
region_id.as_u64(),
42_u64,
)]))))
.build();
let seq = bind_snapshot_bound_region_seq(&ctx, region_id, 99);
assert_eq!(seq, 42);
assert_eq!(ctx.get_snapshot(region_id.as_u64()), Some(42));
}
#[test]
fn test_bind_snapshot_bound_region_seq_sets_sampled_sequence() {
let region_id = RegionId::new(42, 7);
let ctx = QueryContextBuilder::default().build();
let seq = bind_snapshot_bound_region_seq(&ctx, region_id, 99);
assert_eq!(seq, 99);
assert_eq!(ctx.get_snapshot(region_id.as_u64()), Some(99));
}
#[test]
fn test_can_prove_region_watermark_for_non_mito_engine_defaults_true() {
let engine = MockRegionEngine::new(MITO_ENGINE_NAME).0;
assert!(can_prove_region_watermark_for_engine(
engine.as_ref(),
RegionId::new(1, 1),
42,
));
}
#[tokio::test]
async fn test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed() {
let mut env = TestEnv::with_prefix(
"test_can_prove_region_watermark_for_mito_engine_when_snapshot_ge_flushed",
)
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
let region = engine.find_region(region_id).unwrap();
let snapshot_seq = region.flushed_sequence();
assert!(can_prove_region_watermark_for_engine(
&engine,
region_id,
snapshot_seq,
));
}
#[tokio::test]
async fn test_region_watermark_stream_only_sets_terminal_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
@@ -898,7 +1068,8 @@ mod watermark_tests {
.as_stream();
let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let proof_engine = Some(MockRegionEngine::new(MITO_ENGINE_NAME).0 as RegionEngineRef);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99, proof_engine);
let mut pinned = Box::pin(wrapped);
assert!(pinned.as_ref().get_ref().metrics().is_none());
@@ -910,6 +1081,87 @@ mod watermark_tests {
Some(vec![(region_id.as_u64(), 99)])
);
}
#[tokio::test]
async fn test_region_watermark_stream_without_proof_engine_omits_watermark() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();
let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99, None);
let mut pinned = Box::pin(wrapped);
while pinned.next().await.is_some() {}
let metrics = pinned.as_ref().get_ref().metrics().unwrap();
assert!(metrics.region_latest_sequences.is_none());
}
#[tokio::test]
async fn test_region_watermark_stream_omits_watermark_after_late_flush() {
let mut env =
TestEnv::with_prefix("test_region_watermark_stream_omits_watermark_after_late_flush")
.await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let initial_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, initial_rows).await;
test_util::flush_region(&engine, region_id, None).await;
let snapshot_seq = engine.find_region(region_id).unwrap().flushed_sequence();
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();
let wrapped = RegionWatermarkStream::new(
stream,
region_id,
snapshot_seq,
Some(Arc::new(engine.clone()) as RegionEngineRef),
);
let mut pinned = Box::pin(wrapped);
assert!(pinned.next().await.is_some());
let late_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, late_rows).await;
test_util::flush_region(&engine, region_id, None).await;
assert!(pinned.next().await.is_none());
let metrics = pinned.as_ref().get_ref().metrics().unwrap();
assert!(metrics.region_latest_sequences.is_none());
}
}
#[async_trait]

View File

@@ -119,6 +119,56 @@ async fn test_scan_with_min_sst_sequence() {
test_scan_with_min_sst_sequence_with_format(true).await;
}
#[tokio::test]
async fn test_full_snapshot_upper_bound_does_not_constrain_sst_rows() {
let mut env =
TestEnv::with_prefix("test_full_snapshot_upper_bound_does_not_constrain_sst_rows").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let first_rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(0, 3),
};
test_util::put_rows(&engine, region_id, first_rows).await;
test_util::flush_region(&engine, region_id, None).await;
let snapshot_upper_bound = engine.get_committed_sequence(region_id).await.unwrap();
let second_rows = Rows {
schema: column_schemas,
rows: test_util::build_rows(3, 5),
};
test_util::put_rows(&engine, region_id, second_rows).await;
test_util::flush_region(&engine, region_id, None).await;
let scanner = engine
.scanner(
region_id,
ScanRequest {
memtable_max_sequence: Some(snapshot_upper_bound),
..Default::default()
},
)
.await
.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let pretty = batches.pretty_print().unwrap();
assert!(pretty.contains("1970-01-01T00:00:03"));
assert!(pretty.contains("1970-01-01T00:00:04"));
}
async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env

View File

@@ -318,6 +318,10 @@ impl MitoRegion {
self.version_control.committed_sequence()
}
pub fn flushed_sequence(&self) -> SequenceNumber {
self.version_control.current().version.flushed_sequence
}
/// Returns whether the region is readonly.
pub fn is_follower(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Follower

View File

@@ -441,6 +441,13 @@ impl QueryContext {
self.snapshot_seqs.read().unwrap().get(&region_id).cloned()
}
pub fn set_snapshot(&self, region_id: u64, sequence: u64) {
self.snapshot_seqs
.write()
.unwrap()
.insert(region_id, sequence);
}
/// Returns `true` if the session can cast strings to numbers in MySQL style.
pub fn auto_string_to_numeric(&self) -> bool {
matches!(self.channel, Channel::Mysql)

View File

@@ -39,6 +39,7 @@ mod test {
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcServerConfig};
use servers::server::Server;
use store_api::storage::RegionId;
use crate::cluster::GreptimeDbClusterBuilder;
use crate::grpc::query_and_expect;
@@ -154,10 +155,13 @@ mod test {
while let Some(batch) = stream.next().await {
batch.unwrap();
}
assert!(
terminal_metrics
.region_watermark_map()
.is_some_and(|m| !m.is_empty())
let regions = db.list_all_regions().await;
assert_eq!(regions.len(), 1);
let (region_id, region) = regions.into_iter().next().unwrap();
let expected_watermark = (region_id.as_u64(), region.find_committed_sequence());
assert_eq!(
terminal_metrics.region_watermark_map(),
Some(std::collections::HashMap::from([expected_watermark]))
);
let output = client
@@ -175,17 +179,13 @@ mod test {
}
assert_eq!(row_count, 9);
let RecordBatchMetrics {
region_latest_sequences,
..
} = stream.metrics().expect("expected terminal metrics");
let region_latest_sequences =
region_latest_sequences.expect("expected region watermark metrics");
assert_eq!(region_latest_sequences.len(), 1);
assert!(region_latest_sequences[0].0 > 0);
assert!(region_latest_sequences[0].1 > 0);
let metrics = stream.metrics().expect("expected terminal metrics");
let region_latest_sequences = metrics
.region_latest_sequences
.expect("expected region watermark metrics");
assert_eq!(region_latest_sequences, vec![expected_watermark]);
let previous_watermark = region_latest_sequences[0];
let previous_watermark = expected_watermark;
create_table_named(&client, "bar").await;
let result = client
@@ -222,7 +222,10 @@ mod test {
panic!("expected affected rows output");
};
assert_eq!(affected_rows, 9);
assert!(result.region_watermark_map().is_some_and(|m| !m.is_empty()));
assert_eq!(
result.region_watermark_map(),
Some(std::collections::HashMap::from([previous_watermark]))
);
let incremental_batches = create_record_batches(10);
test_put_record_batches(&client, incremental_batches).await;
@@ -283,9 +286,16 @@ mod test {
.expect("expected terminal incremental metrics");
let region_latest_sequences =
region_latest_sequences.expect("expected incremental region watermark metrics");
assert_eq!(region_latest_sequences.len(), 1);
assert_eq!(region_latest_sequences[0].0, previous_watermark.0);
assert!(region_latest_sequences[0].1 > previous_watermark.1);
let regions = db.list_all_regions().await;
let region = regions
.get(&RegionId::from_u64(previous_watermark.0))
.expect("expected source region to exist");
let expected_incremental_watermark =
(previous_watermark.0, region.find_committed_sequence());
assert_eq!(
region_latest_sequences,
vec![expected_incremental_watermark]
);
client.sql("admin flush_table('foo')").await.unwrap();