chore: per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-24 12:03:58 +08:00
parent a4df75158f
commit 7f94f56e32

View File

@@ -1478,15 +1478,22 @@ impl QueryContext {
/// for flow creation and execution.
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct FlowQueryContext {
/// Current catalog name used for flow metadata and execution.
pub catalog: String,
/// Current schema name used for table resolution during flow execution.
pub schema: String,
/// Timezone used for timestamp evaluation in the flow.
pub timezone: String,
/// Query extensions carried into flow execution.
#[serde(default)]
pub extensions: HashMap<String, String>,
/// Request channel propagated from the original query context.
#[serde(default)]
pub channel: u8,
/// Per-region snapshot upper bounds bound during query planning/execution.
#[serde(default)]
pub snapshot_seqs: HashMap<u64, u64>,
/// Per-region lower SST scan bounds carried with the flow context.
#[serde(default)]
pub sst_min_sequences: HashMap<u64, u64>,
}
@@ -1536,21 +1543,19 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
impl From<PbQueryContext> for QueryContext {
fn from(pb_ctx: PbQueryContext) -> Self {
let snapshot_sequences = pb_ctx.snapshot_seqs;
let (snapshot_seqs, sst_min_sequences) = pb_ctx
.snapshot_seqs
.map(|seqs| (seqs.snapshot_seqs, seqs.sst_min_sequences))
.unwrap_or_default();
Self {
current_catalog: pb_ctx.current_catalog,
current_schema: pb_ctx.current_schema,
timezone: pb_ctx.timezone,
extensions: pb_ctx.extensions,
channel: pb_ctx.channel as u8,
snapshot_seqs: snapshot_sequences
.as_ref()
.map(|x| x.snapshot_seqs.clone())
.unwrap_or_default(),
sst_min_sequences: snapshot_sequences
.as_ref()
.map(|x| x.sst_min_sequences.clone())
.unwrap_or_default(),
snapshot_seqs,
sst_min_sequences,
}
}
}
@@ -1573,10 +1578,12 @@ impl From<QueryContext> for PbQueryContext {
timezone,
extensions,
channel: channel as u32,
snapshot_seqs: Some(api::v1::SnapshotSequences {
snapshot_seqs,
sst_min_sequences,
}),
snapshot_seqs: (!snapshot_seqs.is_empty() || !sst_min_sequences.is_empty()).then_some(
api::v1::SnapshotSequences {
snapshot_seqs,
sst_min_sequences,
},
),
explain: None,
}
}