From 7f94f56e32f10d2d4db6385b1c8aa0d36481ee7f Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 24 Mar 2026 12:03:58 +0800 Subject: [PATCH] chore: per review Signed-off-by: discord9 --- src/common/meta/src/rpc/ddl.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index cf96676d26..21997c7b05 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1478,15 +1478,22 @@ impl QueryContext { /// for flow creation and execution. #[derive(Debug, Clone, Serialize, PartialEq)] pub struct FlowQueryContext { + /// Current catalog name used for flow metadata and execution. pub catalog: String, + /// Current schema name used for table resolution during flow execution. pub schema: String, + /// Timezone used for timestamp evaluation in the flow. pub timezone: String, + /// Query extensions carried into flow execution. #[serde(default)] pub extensions: HashMap, + /// Request channel propagated from the original query context. #[serde(default)] pub channel: u8, + /// Per-region snapshot upper bounds bound during query planning/execution. #[serde(default)] pub snapshot_seqs: HashMap, + /// Per-region lower SST scan bounds carried with the flow context. #[serde(default)] pub sst_min_sequences: HashMap, } @@ -1536,21 +1543,19 @@ impl<'de> Deserialize<'de> for FlowQueryContext { impl From for QueryContext { fn from(pb_ctx: PbQueryContext) -> Self { - let snapshot_sequences = pb_ctx.snapshot_seqs; + let (snapshot_seqs, sst_min_sequences) = pb_ctx + .snapshot_seqs + .map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences)) + .unwrap_or_default(); + Self { current_catalog: pb_ctx.current_catalog, current_schema: pb_ctx.current_schema, timezone: pb_ctx.timezone, extensions: pb_ctx.extensions, channel: pb_ctx.channel as u8, - snapshot_seqs: snapshot_sequences - .as_ref() - .map(|x| x.snapshot_seqs.clone()) - .unwrap_or_default(), - sst_min_sequences: snapshot_sequences - .as_ref() - .map(|x| x.sst_min_sequences.clone()) - .unwrap_or_default(), + snapshot_seqs, + sst_min_sequences, } } } @@ -1573,10 +1578,12 @@ impl From for PbQueryContext { timezone, extensions, channel: channel as u32, - snapshot_seqs: Some(api::v1::SnapshotSequences { - snapshot_seqs, - sst_min_sequences, - }), + snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some( + api::v1::SnapshotSequences { + snapshot_seqs, + sst_min_sequences, + }, + ), explain: None, } }