feat: add incremental read context and scan boundaries (#7848)

* feat: add incremental read context and scan boundaries

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* docs: explain field

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-26 15:00:22 +08:00
committed by GitHub
parent 8058ce7cf2
commit d7bc5ad16b
10 changed files with 637 additions and 14 deletions

View File

@@ -37,6 +37,8 @@ fn test_query_context() -> QueryContext {
timezone: "UTC".to_string(),
extensions: HashMap::new(),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
}
}
@@ -251,6 +253,10 @@ fn test_create_flow_data_new_format_serialization() {
catalog: "new_catalog".to_string(),
schema: "new_schema".to_string(),
timezone: "America/New_York".to_string(),
extensions: HashMap::new(),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
};
let data = CreateFlowData {
@@ -272,6 +278,9 @@ fn test_create_flow_data_new_format_serialization() {
assert_eq!(deserialized.flow_context.catalog, "new_catalog");
assert_eq!(deserialized.flow_context.schema, "new_schema");
assert_eq!(deserialized.flow_context.timezone, "America/New_York");
assert_eq!(deserialized.flow_context.channel, 0);
assert_eq!(deserialized.flow_context.snapshot_seqs, HashMap::new());
assert_eq!(deserialized.flow_context.sst_min_sequences, HashMap::new());
}
#[test]
@@ -286,6 +295,8 @@ fn test_flow_query_context_conversion_from_query_context() {
]
.into(),
channel: 99,
snapshot_seqs: HashMap::from([(1, 10)]),
sst_min_sequences: HashMap::from([(1, 8)]),
};
let flow_context: FlowQueryContext = query_context.into();
@@ -293,6 +304,9 @@ fn test_flow_query_context_conversion_from_query_context() {
assert_eq!(flow_context.catalog, "prod_catalog");
assert_eq!(flow_context.schema, "public");
assert_eq!(flow_context.timezone, "America/Los_Angeles");
assert_eq!(flow_context.channel, 99);
assert_eq!(flow_context.snapshot_seqs, HashMap::from([(1, 10)]));
assert_eq!(flow_context.sst_min_sequences, HashMap::from([(1, 8)]));
}
#[test]
@@ -301,6 +315,10 @@ fn test_flow_info_conversion_with_flow_context() {
catalog: "info_catalog".to_string(),
schema: "info_schema".to_string(),
timezone: "Europe/Berlin".to_string(),
extensions: HashMap::new(),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
};
let data = CreateFlowData {
@@ -349,6 +367,10 @@ fn test_mixed_serialization_format_support() {
catalog: "test".to_string(),
schema: "test".to_string(),
timezone: "UTC".to_string(),
extensions: HashMap::new(),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
};
assert_eq!(ctx_from_new, expected_new);
}

View File

@@ -1432,6 +1432,12 @@ pub struct QueryContext {
pub timezone: String,
pub extensions: HashMap<String, String>,
pub channel: u8,
/// Maps region id -> snapshot upper bound sequence for that region.
#[serde(default)]
pub snapshot_seqs: HashMap<u64, u64>,
/// Maps region id -> minimal SST sequence allowed for that region.
#[serde(default)]
pub sst_min_sequences: HashMap<u64, u64>,
}
impl QueryContext {
@@ -1459,6 +1465,14 @@ impl QueryContext {
pub fn channel(&self) -> u8 {
self.channel
}
pub fn snapshot_seqs(&self) -> &HashMap<u64, u64> {
&self.snapshot_seqs
}
pub fn sst_min_sequences(&self) -> &HashMap<u64, u64> {
&self.sst_min_sequences
}
}
/// Lightweight query context for flow operations containing only essential fields.
@@ -1466,12 +1480,24 @@ impl QueryContext {
/// for flow creation and execution.
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct FlowQueryContext {
/// Current catalog name - needed for flow metadata and recovery
/// Current catalog name used for flow metadata and execution.
pub catalog: String,
/// Current schema name - needed for table resolution during flow execution
/// Current schema name used for table resolution during flow execution.
pub schema: String,
/// Timezone for timestamp operations in the flow
/// Timezone used for timestamp evaluation in the flow.
pub timezone: String,
/// Query extensions carried into flow execution.
#[serde(default)]
pub extensions: HashMap<String, String>,
/// Request channel propagated from the original query context.
#[serde(default)]
pub channel: u8,
/// Per-region snapshot upper bounds bound during query planning/execution.
#[serde(default)]
pub snapshot_seqs: HashMap<u64, u64>,
/// Per-region lower SST scan bounds carried with the flow context.
#[serde(default)]
pub sst_min_sequences: HashMap<u64, u64>,
}
impl<'de> Deserialize<'de> for FlowQueryContext {
@@ -1492,6 +1518,14 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
catalog: String,
schema: String,
timezone: String,
#[serde(default)]
extensions: HashMap<String, String>,
#[serde(default)]
channel: u8,
#[serde(default)]
snapshot_seqs: HashMap<u64, u64>,
#[serde(default)]
sst_min_sequences: HashMap<u64, u64>,
}
match ContextCompat::deserialize(deserializer)? {
@@ -1499,6 +1533,10 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
catalog: helper.catalog,
schema: helper.schema,
timezone: helper.timezone,
extensions: helper.extensions,
channel: helper.channel,
snapshot_seqs: helper.snapshot_seqs,
sst_min_sequences: helper.sst_min_sequences,
}),
ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
}
@@ -1507,12 +1545,19 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
impl From<PbQueryContext> for QueryContext {
fn from(pb_ctx: PbQueryContext) -> Self {
let (snapshot_seqs, sst_min_sequences) = pb_ctx
.snapshot_seqs
.map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences))
.unwrap_or_default();
Self {
current_catalog: pb_ctx.current_catalog,
current_schema: pb_ctx.current_schema,
timezone: pb_ctx.timezone,
extensions: pb_ctx.extensions,
channel: pb_ctx.channel as u8,
snapshot_seqs,
sst_min_sequences,
}
}
}
@@ -1525,6 +1570,8 @@ impl From<QueryContext> for PbQueryContext {
timezone,
extensions,
channel,
snapshot_seqs,
sst_min_sequences,
}: QueryContext,
) -> Self {
PbQueryContext {
@@ -1533,7 +1580,12 @@ impl From<QueryContext> for PbQueryContext {
timezone,
extensions,
channel: channel as u32,
snapshot_seqs: None,
snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some(
api::v1::SnapshotSequences {
snapshot_seqs,
sst_min_sequences,
},
),
explain: None,
}
}
@@ -1545,6 +1597,10 @@ impl From<QueryContext> for FlowQueryContext {
catalog: ctx.current_catalog,
schema: ctx.current_schema,
timezone: ctx.timezone,
extensions: ctx.extensions,
channel: ctx.channel,
snapshot_seqs: ctx.snapshot_seqs,
sst_min_sequences: ctx.sst_min_sequences,
}
}
}
@@ -1555,8 +1611,10 @@ impl From<FlowQueryContext> for QueryContext {
current_catalog: flow_ctx.catalog,
current_schema: flow_ctx.schema,
timezone: flow_ctx.timezone,
extensions: HashMap::new(),
channel: 0, // Use default channel for flows
extensions: flow_ctx.extensions,
channel: flow_ctx.channel,
snapshot_seqs: flow_ctx.snapshot_seqs,
sst_min_sequences: flow_ctx.sst_min_sequences,
}
}
}
@@ -1720,6 +1778,8 @@ mod tests {
timezone: "UTC".to_string(),
extensions,
channel: 5,
snapshot_seqs: HashMap::from([(10, 100)]),
sst_min_sequences: HashMap::from([(10, 90)]),
};
let flow_ctx: FlowQueryContext = query_ctx.into();
@@ -1727,6 +1787,9 @@ mod tests {
assert_eq!(flow_ctx.catalog, "test_catalog");
assert_eq!(flow_ctx.schema, "test_schema");
assert_eq!(flow_ctx.timezone, "UTC");
assert_eq!(flow_ctx.channel, 5);
assert_eq!(flow_ctx.snapshot_seqs, HashMap::from([(10, 100)]));
assert_eq!(flow_ctx.sst_min_sequences, HashMap::from([(10, 90)]));
}
#[test]
@@ -1735,6 +1798,10 @@ mod tests {
catalog: "prod_catalog".to_string(),
schema: "public".to_string(),
timezone: "America/New_York".to_string(),
extensions: HashMap::from([("k".to_string(), "v".to_string())]),
channel: 7,
snapshot_seqs: HashMap::from([(11, 111)]),
sst_min_sequences: HashMap::from([(11, 101)]),
};
let query_ctx: QueryContext = flow_ctx.clone().into();
@@ -1742,8 +1809,13 @@ mod tests {
assert_eq!(query_ctx.current_catalog, "prod_catalog");
assert_eq!(query_ctx.current_schema, "public");
assert_eq!(query_ctx.timezone, "America/New_York");
assert!(query_ctx.extensions.is_empty());
assert_eq!(query_ctx.channel, 0);
assert_eq!(
query_ctx.extensions,
HashMap::from([("k".to_string(), "v".to_string())])
);
assert_eq!(query_ctx.channel, 7);
assert_eq!(query_ctx.snapshot_seqs, HashMap::from([(11, 111)]));
assert_eq!(query_ctx.sst_min_sequences, HashMap::from([(11, 101)]));
// Test roundtrip conversion
let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
@@ -1756,6 +1828,10 @@ mod tests {
catalog: "test_catalog".to_string(),
schema: "test_schema".to_string(),
timezone: "UTC".to_string(),
extensions: HashMap::new(),
channel: 0,
snapshot_seqs: HashMap::new(),
sst_min_sequences: HashMap::new(),
};
let serialized = serde_json::to_string(&flow_ctx).unwrap();
@@ -1776,6 +1852,10 @@ mod tests {
catalog: "pb_catalog".to_string(),
schema: "pb_schema".to_string(),
timezone: "Asia/Tokyo".to_string(),
extensions: HashMap::from([("x".to_string(), "y".to_string())]),
channel: 6,
snapshot_seqs: HashMap::from([(3, 30)]),
sst_min_sequences: HashMap::from([(3, 21)]),
};
let pb_ctx: PbQueryContext = flow_ctx.into();
@@ -1783,9 +1863,44 @@ mod tests {
assert_eq!(pb_ctx.current_catalog, "pb_catalog");
assert_eq!(pb_ctx.current_schema, "pb_schema");
assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
assert!(pb_ctx.extensions.is_empty());
assert_eq!(pb_ctx.channel, 0);
assert!(pb_ctx.snapshot_seqs.is_none());
assert_eq!(
pb_ctx.extensions,
HashMap::from([("x".to_string(), "y".to_string())])
);
assert_eq!(pb_ctx.channel, 6);
assert_eq!(
pb_ctx.snapshot_seqs,
Some(api::v1::SnapshotSequences {
snapshot_seqs: HashMap::from([(3, 30)]),
sst_min_sequences: HashMap::from([(3, 21)]),
})
);
assert!(pb_ctx.explain.is_none());
}
#[test]
fn test_pb_query_context_roundtrip_with_snapshot_sequences() {
let pb = PbQueryContext {
current_catalog: "c1".to_string(),
current_schema: "s1".to_string(),
timezone: "UTC".to_string(),
extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
channel: 3,
snapshot_seqs: Some(api::v1::SnapshotSequences {
snapshot_seqs: HashMap::from([(1, 100)]),
sst_min_sequences: HashMap::from([(1, 90)]),
}),
explain: None,
};
let query_ctx: QueryContext = pb.clone().into();
let pb_roundtrip: PbQueryContext = query_ctx.into();
assert_eq!(pb_roundtrip.current_catalog, pb.current_catalog);
assert_eq!(pb_roundtrip.current_schema, pb.current_schema);
assert_eq!(pb_roundtrip.timezone, pb.timezone);
assert_eq!(pb_roundtrip.extensions, pb.extensions);
assert_eq!(pb_roundtrip.channel, pb.channel);
assert_eq!(pb_roundtrip.snapshot_seqs, pb.snapshot_seqs);
}
}

View File

@@ -94,7 +94,7 @@ impl RegionEngine for FileRegionEngine {
let stream = self.handle_query(region_id, request).await?;
let metadata = self.get_metadata(region_id).await?;
// We don't support enabling append mode for file engine.
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata, None));
Ok(scanner)
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
use common_time::Timezone;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::ResultExt;
@@ -27,6 +29,8 @@ pub fn to_meta_query_context(
timezone: query_context.timezone().to_string(),
extensions: query_context.extensions(),
channel: query_context.channel() as u8,
snapshot_seqs: query_context.snapshots(),
sst_min_sequences: query_context.sst_min_sequences(),
}
}
@@ -43,5 +47,41 @@ pub fn try_to_session_query_context(
)
.extensions(value.extensions)
.channel((value.channel as u32).into())
.snapshot_seqs(Arc::new(RwLock::new(value.snapshot_seqs)))
.sst_min_sequences(Arc::new(RwLock::new(value.sst_min_sequences)))
.build())
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use common_time::Timezone;
use session::context::QueryContextBuilder;
use super::{to_meta_query_context, try_to_session_query_context};
#[test]
fn test_query_context_meta_roundtrip_with_sequences() {
let session_ctx = Arc::new(
QueryContextBuilder::default()
.current_catalog("c1".to_string())
.current_schema("s1".to_string())
.timezone(Timezone::from_tz_string("UTC").unwrap())
.set_extension("flow.return_region_seq".to_string(), "true".to_string())
.snapshot_seqs(Arc::new(RwLock::new(HashMap::from([(10, 100)]))))
.sst_min_sequences(Arc::new(RwLock::new(HashMap::from([(10, 90)]))))
.build(),
);
let meta_ctx = to_meta_query_context(session_ctx);
let roundtrip = try_to_session_query_context(meta_ctx).unwrap();
assert_eq!(roundtrip.current_catalog(), "c1");
assert_eq!(roundtrip.current_schema(), "s1");
assert_eq!(roundtrip.snapshots(), HashMap::from([(10, 100)]));
assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)]));
assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true"));
}
}

View File

@@ -368,6 +368,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid query context extension: {}", reason))]
InvalidQueryContextExtension {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
Datatypes {
source: datatypes::error::Error,
@@ -399,7 +406,8 @@ impl ErrorExt for Error {
| ColumnSchemaNoDefault { .. }
| CteColumnSchemaMismatch { .. }
| ConvertValue { .. }
| TryIntoDuration { .. } => StatusCode::InvalidArguments,
| TryIntoDuration { .. }
| InvalidQueryContextExtension { .. } => StatusCode::InvalidArguments,
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,

View File

@@ -12,8 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_base::memory_limit::MemoryLimit;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error::{Error, InvalidQueryContextExtensionSnafu, Result};
pub const FLOW_INCREMENTAL_AFTER_SEQS: &str = "flow.incremental_after_seqs";
pub const FLOW_INCREMENTAL_MODE: &str = "flow.incremental_mode";
pub const FLOW_RETURN_REGION_SEQ: &str = "flow.return_region_seq";
pub const FLOW_SINK_TABLE_ID: &str = "flow.sink_table_id";
pub const FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY: &str = "memtable_only";
/// Query engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -39,3 +52,352 @@ impl Default for QueryOptions {
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlowIncrementalMode {
MemtableOnly,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct FlowQueryExtensions {
/// Maps region id -> lower exclusive sequence bound for incremental reads.
pub incremental_after_seqs: Option<HashMap<u64, u64>>,
/// Incremental read mode requested by the caller.
pub incremental_mode: Option<FlowIncrementalMode>,
/// Whether the caller expects per-region watermark metadata in terminal metrics.
pub return_region_seq: bool,
/// Optional sink table id used to distinguish source scans from sink reads.
pub sink_table_id: Option<TableId>,
}
impl FlowQueryExtensions {
pub fn from_extensions(extensions: &HashMap<String, String>) -> Result<Self> {
let incremental_mode = extensions
.get(FLOW_INCREMENTAL_MODE)
.map(|value| match value.as_str() {
v if v.eq_ignore_ascii_case(FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY) => {
Ok(FlowIncrementalMode::MemtableOnly)
}
_ => Err(invalid_query_context_extension(format!(
"Invalid value for {}: {}",
FLOW_INCREMENTAL_MODE, value
))),
})
.transpose()?;
let incremental_after_seqs = extensions
.get(FLOW_INCREMENTAL_AFTER_SEQS)
.map(|value| parse_incremental_after_seqs(value.as_str()))
.transpose()?;
let return_region_seq = extensions
.get(FLOW_RETURN_REGION_SEQ)
.map(|value| parse_bool(value.as_str()))
.transpose()?
.unwrap_or(false);
let sink_table_id = extensions
.get(FLOW_SINK_TABLE_ID)
.map(|value| {
value.parse::<TableId>().map_err(|_| {
invalid_query_context_extension(format!(
"Invalid value for {}: {}",
FLOW_SINK_TABLE_ID, value
))
})
})
.transpose()?;
if matches!(incremental_mode, Some(FlowIncrementalMode::MemtableOnly)) {
let after_seqs = incremental_after_seqs.as_ref().ok_or_else(|| {
invalid_query_context_extension(format!(
"{} is required when {}={}.",
FLOW_INCREMENTAL_AFTER_SEQS,
FLOW_INCREMENTAL_MODE,
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
))
})?;
if after_seqs.is_empty() {
return Err(invalid_query_context_extension(format!(
"{} must not be empty when {}={}.",
FLOW_INCREMENTAL_AFTER_SEQS,
FLOW_INCREMENTAL_MODE,
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY
)));
}
}
Ok(Self {
incremental_after_seqs,
incremental_mode,
return_region_seq,
sink_table_id,
})
}
pub fn validate_for_scan(&self, source_region_id: RegionId) -> Result<bool> {
if self.sink_table_id.is_some() && self.sink_table_id == Some(source_region_id.table_id()) {
return Ok(false);
}
if matches!(
self.incremental_mode,
Some(FlowIncrementalMode::MemtableOnly)
) {
let after_seqs = self.incremental_after_seqs.as_ref().ok_or_else(|| {
invalid_query_context_extension(format!(
"{} is required when {}=memtable_only.",
FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
))
})?;
if !after_seqs.contains_key(&source_region_id.as_u64()) {
return Err(invalid_query_context_extension(format!(
"Missing region {} in {} when {}=memtable_only.",
source_region_id, FLOW_INCREMENTAL_AFTER_SEQS, FLOW_INCREMENTAL_MODE
)));
}
}
Ok(self.incremental_after_seqs.is_some())
}
pub fn should_collect_region_watermark(&self) -> bool {
self.return_region_seq || self.incremental_after_seqs.is_some()
}
}
fn parse_incremental_after_seqs(value: &str) -> Result<HashMap<u64, u64>> {
let raw = serde_json::from_str::<HashMap<String, serde_json::Value>>(value).map_err(|e| {
invalid_query_context_extension(format!(
"Invalid JSON for {}: {} ({})",
FLOW_INCREMENTAL_AFTER_SEQS, value, e
))
})?;
raw.into_iter()
.map(|(region_id, raw_seq)| {
let region_id = region_id.parse::<u64>().map_err(|_| {
invalid_query_context_extension(format!(
"Invalid region id in {}: {}",
FLOW_INCREMENTAL_AFTER_SEQS, region_id
))
})?;
let seq = match raw_seq {
serde_json::Value::Number(num) => num.as_u64().ok_or_else(|| {
invalid_query_context_extension(format!(
"Invalid sequence value in {} for region {}: {}",
FLOW_INCREMENTAL_AFTER_SEQS, region_id, num
))
})?,
serde_json::Value::String(s) => s.parse::<u64>().map_err(|_| {
invalid_query_context_extension(format!(
"Invalid sequence string in {} for region {}: {}",
FLOW_INCREMENTAL_AFTER_SEQS, region_id, s
))
})?,
_ => {
return Err(invalid_query_context_extension(format!(
"Invalid sequence value type in {} for region {}",
FLOW_INCREMENTAL_AFTER_SEQS, region_id
)));
}
};
Ok((region_id, seq))
})
.collect()
}
fn parse_bool(value: &str) -> Result<bool> {
match value {
v if v.eq_ignore_ascii_case("true") => Ok(true),
v if v.eq_ignore_ascii_case("false") => Ok(false),
_ => Err(invalid_query_context_extension(format!(
"Invalid value for {}: {}",
FLOW_RETURN_REGION_SEQ, value
))),
}
}
fn invalid_query_context_extension(reason: String) -> Error {
InvalidQueryContextExtensionSnafu { reason }.build()
}
#[cfg(test)]
mod flow_extension_tests {
use super::*;
#[test]
fn test_parse_flow_extensions_default() {
let exts = HashMap::new();
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
assert_eq!(parsed.incremental_mode, None);
assert_eq!(parsed.incremental_after_seqs, None);
assert!(!parsed.return_region_seq);
assert_eq!(parsed.sink_table_id, None);
}
#[test]
fn test_parse_flow_extensions_memtable_only_success() {
let exts = HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"1":10,"2":20}"#.to_string(),
),
(FLOW_RETURN_REGION_SEQ.to_string(), "true".to_string()),
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
assert_eq!(
parsed.incremental_mode,
Some(FlowIncrementalMode::MemtableOnly)
);
assert_eq!(
parsed.incremental_after_seqs.unwrap(),
HashMap::from([(1, 10), (2, 20)])
);
assert!(parsed.return_region_seq);
assert_eq!(parsed.sink_table_id, Some(1024));
}
#[test]
fn test_parse_flow_extensions_mode_requires_after_seqs() {
let exts = HashMap::from([(
FLOW_INCREMENTAL_MODE.to_string(),
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
)]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
}
#[test]
fn test_parse_flow_extensions_invalid_mode() {
let exts = HashMap::from([(FLOW_INCREMENTAL_MODE.to_string(), "foo".to_string())]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_MODE));
}
#[test]
fn test_parse_flow_extensions_invalid_after_seqs_json() {
let exts = HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
"not-json".to_string(),
),
]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
}
#[test]
fn test_parse_flow_extensions_after_seqs_string_values() {
let exts = HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"1":"10","2":"20"}"#.to_string(),
)]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
assert_eq!(
parsed.incremental_after_seqs.unwrap(),
HashMap::from([(1, 10), (2, 20)])
);
}
#[test]
fn test_parse_flow_extensions_after_seqs_invalid_value_type() {
let exts = HashMap::from([(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
r#"{"1":true}"#.to_string(),
)]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_INCREMENTAL_AFTER_SEQS));
}
#[test]
fn test_parse_flow_extensions_invalid_sink_table_id() {
let exts = HashMap::from([(FLOW_SINK_TABLE_ID.to_string(), "x".to_string())]);
let err = FlowQueryExtensions::from_extensions(&exts).unwrap_err();
assert!(format!("{err}").contains(FLOW_SINK_TABLE_ID));
}
#[test]
fn test_validate_for_scan_missing_source_region() {
let source_region_id = RegionId::new(100, 2);
let existing_region_id = RegionId::new(100, 1);
let exts = HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
format!(r#"{{"{}":10}}"#, existing_region_id.as_u64()),
),
]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let err = parsed.validate_for_scan(source_region_id).unwrap_err();
assert!(format!("{err}").contains("Missing region"));
}
#[test]
fn test_validate_for_scan_sink_table_excluded() {
let source_region_id = RegionId::new(1024, 1);
let exts = HashMap::from([
(
FLOW_INCREMENTAL_MODE.to_string(),
FLOW_INCREMENTAL_MODE_MEMTABLE_ONLY.to_string(),
),
(
FLOW_INCREMENTAL_AFTER_SEQS.to_string(),
format!(r#"{{"{}":10}}"#, source_region_id.as_u64()),
),
(FLOW_SINK_TABLE_ID.to_string(), "1024".to_string()),
]);
let parsed = FlowQueryExtensions::from_extensions(&exts).unwrap();
let apply_incremental = parsed.validate_for_scan(source_region_id).unwrap();
assert!(!apply_incremental);
}
#[test]
fn test_should_collect_region_watermark_defaults_false() {
let parsed = FlowQueryExtensions::default();
assert!(!parsed.should_collect_region_watermark());
}
#[test]
fn test_should_collect_region_watermark_true_for_return_region_seq() {
let parsed = FlowQueryExtensions {
return_region_seq: true,
..Default::default()
};
assert!(parsed.should_collect_region_watermark());
}
#[test]
fn test_should_collect_region_watermark_true_for_incremental_query() {
let parsed = FlowQueryExtensions {
incremental_after_seqs: Some(HashMap::from([(1, 10)])),
..Default::default()
};
assert!(parsed.should_collect_region_watermark());
}
}

View File

@@ -433,10 +433,21 @@ impl QueryContext {
self.snapshot_seqs.read().unwrap().clone()
}
pub fn sst_min_sequences(&self) -> HashMap<u64, u64> {
self.sst_min_sequences.read().unwrap().clone()
}
pub fn get_snapshot(&self, region_id: u64) -> Option<u64> {
self.snapshot_seqs.read().unwrap().get(&region_id).cloned()
}
pub fn set_snapshot(&self, region_id: u64, sequence: u64) {
self.snapshot_seqs
.write()
.unwrap()
.insert(region_id, sequence);
}
/// Returns `true` if the session can cast strings to numbers in MySQL style.
pub fn auto_string_to_numeric(&self) -> bool {
matches!(self.channel, Channel::Mysql)
@@ -669,6 +680,8 @@ impl ConfigurationVariables {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use super::*;
@@ -704,4 +717,30 @@ mod test {
let context = QueryContext::with(DEFAULT_CATALOG_NAME, "test");
assert_eq!("test", context.get_db_string());
}
#[test]
fn test_api_query_context_roundtrip_with_sequences() {
let api_ctx = api::v1::QueryContext {
current_catalog: "c1".to_string(),
current_schema: "s1".to_string(),
timezone: "UTC".to_string(),
extensions: HashMap::from([("flow.return_region_seq".to_string(), "true".to_string())]),
channel: Channel::Grpc as u32,
snapshot_seqs: Some(api::v1::SnapshotSequences {
snapshot_seqs: HashMap::from([(1, 100)]),
sst_min_sequences: HashMap::from([(1, 90)]),
}),
explain: None,
};
let session_ctx: QueryContext = api_ctx.clone().into();
let roundtrip_api: api::v1::QueryContext = session_ctx.into();
assert_eq!(roundtrip_api.current_catalog, api_ctx.current_catalog);
assert_eq!(roundtrip_api.current_schema, api_ctx.current_schema);
assert_eq!(roundtrip_api.timezone, api_ctx.timezone);
assert_eq!(roundtrip_api.extensions, api_ctx.extensions);
assert_eq!(roundtrip_api.channel, api_ctx.channel);
assert_eq!(roundtrip_api.snapshot_seqs, api_ctx.snapshot_seqs);
}
}

View File

@@ -462,6 +462,10 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Sets whether the scanner is reading a logical region.
fn set_logical_region(&mut self, logical_region: bool);
fn snapshot_sequence(&self) -> Option<SequenceNumber> {
None
}
}
pub type RegionScannerRef = Box<dyn RegionScanner>;
@@ -945,6 +949,7 @@ pub struct SinglePartitionScanner {
schema: SchemaRef,
properties: ScannerProperties,
metadata: RegionMetadataRef,
snapshot_sequence: Option<SequenceNumber>,
}
impl SinglePartitionScanner {
@@ -953,6 +958,7 @@ impl SinglePartitionScanner {
stream: SendableRecordBatchStream,
append_mode: bool,
metadata: RegionMetadataRef,
snapshot_sequence: Option<SequenceNumber>,
) -> Self {
let schema = stream.schema();
Self {
@@ -960,6 +966,7 @@ impl SinglePartitionScanner {
schema,
properties: ScannerProperties::default().with_append_mode(append_mode),
metadata,
snapshot_sequence,
}
}
}
@@ -1019,6 +1026,10 @@ impl RegionScanner for SinglePartitionScanner {
fn set_logical_region(&mut self, logical_region: bool) {
self.properties.set_logical_region(logical_region);
}
fn snapshot_sequence(&self) -> Option<SequenceNumber> {
self.snapshot_sequence
}
}
impl DisplayAs for SinglePartitionScanner {

View File

@@ -112,6 +112,8 @@ pub struct ScanRequest {
/// Optional constraint on the sequence number of the rows to read.
/// If set, only rows with a sequence number **lesser or equal** to this value
/// will be returned.
/// This is the effective memtable upper bound used by the scan, whether provided
/// explicitly or bound on scan open.
pub memtable_max_sequence: Option<SequenceNumber>,
/// Optional constraint on the minimal sequence number in the memtable.
/// If set, only the memtables that contain sequences **greater than** this value will be scanned
@@ -119,6 +121,8 @@ pub struct ScanRequest {
/// Optional constraint on the minimal sequence number in the SST files.
/// If set, only the SST files that contain sequences greater than this value will be scanned.
pub sst_min_sequence: Option<SequenceNumber>,
/// Whether to bind the effective snapshot upper bound when opening the scan.
pub snapshot_on_scan: bool,
/// Optional hint for the distribution of time-series data.
pub distribution: Option<TimeSeriesDistribution>,
/// Optional hint for KNN vector search. When set, the scan should use
@@ -195,6 +199,14 @@ impl Display for ScanRequest {
sst_min_sequence
)?;
}
if self.snapshot_on_scan {
write!(
f,
"{}snapshot_on_scan: {}",
delimiter.as_str(),
self.snapshot_on_scan
)?;
}
if let Some(distribution) = &self.distribution {
write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
}
@@ -278,5 +290,14 @@ mod tests {
request.to_string(),
"ScanRequest { force_flat_format: true }"
);
let request = ScanRequest {
snapshot_on_scan: true,
..Default::default()
};
assert_eq!(
request.to_string(),
"ScanRequest { snapshot_on_scan: true }"
);
}
}

View File

@@ -599,7 +599,12 @@ mod test {
.primary_key(vec![1]);
let region_metadata = Arc::new(builder.build().unwrap());
let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
let scanner = Box::new(SinglePartitionScanner::new(
stream,
false,
region_metadata,
None,
));
let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap();
let actual: SchemaRef = Arc::new(
plan.properties