mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 09:50:40 +00:00
@@ -129,8 +129,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
|
||||
use crate::error::{
|
||||
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
|
||||
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
|
||||
IncrementalQueryStaleSnafu, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu,
|
||||
RegionNotFoundSnafu, Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu,
|
||||
};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::extension::BoxedExtensionRangeProviderFactory;
|
||||
@@ -1017,6 +1017,19 @@ impl EngineInner {
|
||||
// Reading a region doesn't need to go through the region worker thread.
|
||||
let region = self.find_region(region_id)?;
|
||||
let version = region.version();
|
||||
|
||||
if let Some(given_seq) = request.memtable_min_sequence {
|
||||
let min_readable_seq = version.flushed_sequence;
|
||||
ensure!(
|
||||
given_seq >= min_readable_seq,
|
||||
IncrementalQueryStaleSnafu {
|
||||
region_id,
|
||||
given_seq,
|
||||
min_readable_seq,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Get cache.
|
||||
let cache_manager = self.workers.cache_manager();
|
||||
|
||||
|
||||
@@ -23,10 +23,96 @@ use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Error;
|
||||
use crate::read::scan_region::Scanner;
|
||||
use crate::test_util;
|
||||
use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_incremental_query_stale_error() {
|
||||
let mut env = TestEnv::with_prefix("test_incremental_query_stale_error").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(0, 3),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
|
||||
let err = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
memtable_min_sequence: Some(0),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.err()
|
||||
.expect("expect stale incremental error");
|
||||
|
||||
let min_readable_seq = match &err {
|
||||
Error::IncrementalQueryStale {
|
||||
region_id: err_region_id,
|
||||
given_seq,
|
||||
min_readable_seq,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(*err_region_id, region_id);
|
||||
assert_eq!(*given_seq, 0);
|
||||
assert!(*min_readable_seq > 0);
|
||||
*min_readable_seq
|
||||
}
|
||||
_ => panic!("unexpected err: {err}"),
|
||||
};
|
||||
assert_eq!(StatusCode::InvalidArguments, err.status_code());
|
||||
let err_msg = err.to_string();
|
||||
assert!(err_msg.contains("STALE_CURSOR"));
|
||||
assert!(err_msg.contains(®ion_id.to_string()));
|
||||
assert!(err_msg.contains("given_seq: 0"));
|
||||
assert!(err_msg.contains(&format!("min_readable_seq: {min_readable_seq}")));
|
||||
|
||||
let incremental_rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: test_util::build_rows(3, 5),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, incremental_rows).await;
|
||||
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
memtable_min_sequence: Some(min_readable_seq),
|
||||
sst_min_sequence: Some(u64::MAX),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(
|
||||
batches.pretty_print().unwrap(),
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 3 | 3.0 | 1970-01-01T00:00:03 |
|
||||
| 4 | 4.0 | 1970-01-01T00:00:04 |
|
||||
+-------+---------+---------------------+"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_with_min_sst_sequence() {
|
||||
test_scan_with_min_sst_sequence_with_format(false).await;
|
||||
|
||||
@@ -228,6 +228,20 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"STALE_CURSOR: incremental query stale, region: {}, given_seq: {}, min_readable_seq: {}, retry_hint: FALLBACK_FULL_RECOMPUTE",
|
||||
region_id,
|
||||
given_seq,
|
||||
min_readable_seq
|
||||
))]
|
||||
IncrementalQueryStale {
|
||||
region_id: RegionId,
|
||||
given_seq: u64,
|
||||
min_readable_seq: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Old manifest missing for region {}", region_id))]
|
||||
MissingOldManifest {
|
||||
region_id: RegionId,
|
||||
@@ -1286,6 +1300,7 @@ impl ErrorExt for Error {
|
||||
| InvalidScanIndex { .. }
|
||||
| InvalidMeta { .. }
|
||||
| InvalidRequest { .. }
|
||||
| IncrementalQueryStale { .. }
|
||||
| PartitionExprVersionMismatch { .. }
|
||||
| FillDefault { .. }
|
||||
| ConvertColumnDataType { .. }
|
||||
|
||||
Reference in New Issue
Block a user