From 18fbd3054a8cffa37128ecd350ccad4ce8652f58 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 15 Apr 2026 15:06:09 +0800 Subject: [PATCH] feat: filter id&refactor to type Signed-off-by: discord9 --- src/query/src/dist_plan.rs | 2 + src/query/src/dist_plan/filter_id.rs | 267 ++++++++++++++++++ src/query/src/dist_plan/merge_scan.rs | 106 +------ .../dist_plan/remote_dyn_filter_registry.rs | 94 +++--- src/query/src/query_engine/state.rs | 144 +++++++++- src/session/src/context.rs | 18 +- src/session/src/lib.rs | 1 + src/session/src/query_id.rs | 76 +++++ 8 files changed, 562 insertions(+), 146 deletions(-) create mode 100644 src/query/src/dist_plan/filter_id.rs create mode 100644 src/session/src/query_id.rs diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index f5704f8fab..4c0a17542b 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -14,6 +14,7 @@ mod analyzer; mod commutativity; +mod filter_id; mod merge_scan; mod merge_sort; mod planner; @@ -22,6 +23,7 @@ mod region_pruner; mod remote_dyn_filter_registry; pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions}; +pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError}; pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan}; pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner}; pub use predicate_extractor::PredicateExtractor; diff --git a/src/query/src/dist_plan/filter_id.rs b/src/query/src/dist_plan/filter_id.rs new file mode 100644 index 0000000000..3782a4bdc2 --- /dev/null +++ b/src/query/src/dist_plan/filter_id.rs @@ -0,0 +1,267 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Display, Formatter}; +use std::hash::{DefaultHasher, Hasher}; +use std::num::ParseIntError; +use std::str::FromStr; +use std::sync::Arc; + +use datafusion_common::{DataFusionError, Result}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; +use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; +use prost::Message; +use store_api::storage::RegionId; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct FilterFingerprint(u64); + +impl FilterFingerprint { + pub fn new(value: u64) -> Self { + Self(value) + } + + pub fn get(self) -> u64 { + self.0 + } +} + +impl Display for FilterFingerprint { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:016x}", self.0) + } +} + +impl FromStr for FilterFingerprint { + type Err = ParseIntError; + + fn from_str(s: &str) -> Result { + Ok(Self(u64::from_str_radix(s, 16)?)) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct FilterId { + region_id: RegionId, + producer_ordinal: u32, + children_fingerprint: FilterFingerprint, +} + +// NOTE(remote-dyn-filter): FilterId is generated once by the source-side planner/runtime and then +// propagated through the query lifecycle. Consumers should treat it as the canonical propagated +// identifier instead of independently recomputing it from local state. + +impl FilterId { + pub fn new( + region_id: RegionId, + producer_ordinal: u32, + children_fingerprint: FilterFingerprint, + ) -> Self { + Self { + region_id, + producer_ordinal, + children_fingerprint, + } + } + + pub fn region_id(&self) -> RegionId { + self.region_id + } + + pub fn producer_ordinal(&self) -> u32 { + self.producer_ordinal + } + + pub fn children_fingerprint(&self) -> FilterFingerprint { + self.children_fingerprint + } +} + +impl Display for FilterId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{}:{}", + self.region_id.as_u64(), + self.producer_ordinal, + self.children_fingerprint + ) + } +} + +impl FromStr for FilterId { + type Err = ParseFilterIdError; + + fn from_str(s: &str) -> Result { + let mut parts = s.split(':'); + let region_id = parts + .next() + .ok_or(ParseFilterIdError)? + .parse::() + .map(RegionId::from_u64) + .map_err(|_| ParseFilterIdError)?; + let producer_ordinal = parts + .next() + .ok_or(ParseFilterIdError)? + .parse::() + .map_err(|_| ParseFilterIdError)?; + let children_fingerprint = parts + .next() + .ok_or(ParseFilterIdError)? + .parse::() + .map_err(|_| ParseFilterIdError)?; + if parts.next().is_some() { + return Err(ParseFilterIdError); + } + + Ok(Self::new(region_id, producer_ordinal, children_fingerprint)) + } +} + +#[derive(Debug)] +pub struct ParseFilterIdError; + +impl Display for ParseFilterIdError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "invalid filter id") + } +} + +/// Builds the query-local remote dynamic filter identity. +/// +/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`. +/// Subscriber routing details such as `partition` stay outside this key so they can remain in +/// the later fanout/subscriber map instead of splitting one shared remote filter state. +/// +/// NOTE(remote-dyn-filter): This id is generated once on the source side and then propagated. +/// Consumers should reuse the propagated `FilterId` instead of independently recomputing it from +/// local state. +#[allow(unused)] +pub(crate) fn build_remote_dyn_filter_id( + region_id: RegionId, + producer_local_ordinal: usize, + children: &[Arc], +) -> Result { + let children_fingerprint = canonicalize_dyn_filter_children(children)?; + let producer_local_ordinal = u32::try_from(producer_local_ordinal).map_err(|err| { + let _ = err; + DataFusionError::Execution("producer ordinal out of range for filter id".to_string()) + })?; + Ok(FilterId::new( + region_id, + producer_local_ordinal, + children_fingerprint, + )) +} + +fn canonicalize_dyn_filter_children( + children: &[Arc], +) -> Result { + let codec = DefaultPhysicalExtensionCodec {}; + let mut hasher = DefaultHasher::new(); + hasher.write_usize(children.len()); + + for child in children { + let proto = serialize_physical_expr(child, &codec)?; + let mut bytes = Vec::new(); + proto + .encode(&mut bytes) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + hasher.write_usize(bytes.len()); + hasher.write(&bytes); + } + + Ok(FilterFingerprint::new(hasher.finish())) +} + +#[cfg(test)] +mod tests { + use datafusion_physical_expr::expressions::Column; + + use super::*; + + fn test_children(names: &[&str]) -> Vec> { + names + .iter() + .enumerate() + .map(|(index, name)| Arc::new(Column::new(name, index)) as Arc) + .collect() + } + + #[test] + fn filter_id_round_trips_through_string() { + let filter_id = FilterId::new(RegionId::new(1024, 7), 3, FilterFingerprint::new(0xabc)); + let encoded = filter_id.to_string(); + + assert_eq!(encoded.parse::().unwrap(), filter_id); + } + + #[test] + fn filter_id_rejects_malformed_strings() { + assert!("".parse::().is_err()); + assert!("1024:3".parse::().is_err()); + assert!("1024:3:zzzz".parse::().is_err()); + assert!("1024:3:0000000000000abc:extra".parse::().is_err()); + } + + #[test] + fn remote_dyn_filter_id_is_stable_for_equivalent_children() { + let region_id = RegionId::new(1024, 7); + let first = + build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap(); + let second = + build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap(); + + assert_eq!(first, second); + } + + #[test] + fn remote_dyn_filter_id_changes_when_identity_inputs_change() { + let children = test_children(&["host", "pod"]); + let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap(); + let different_region = + build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap(); + let different_ordinal = + build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap(); + let different_children = + build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"])) + .unwrap(); + + assert_ne!(baseline, different_region); + assert_ne!(baseline, different_ordinal); + assert_ne!(baseline, different_children); + } + + #[test] + fn remote_dyn_filter_id_supports_empty_children() { + let region_id = RegionId::new(4096, 2); + let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap(); + let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap(); + + assert_eq!(first, second); + assert_eq!(first.region_id(), region_id); + assert_eq!(first.producer_ordinal(), 1); + assert_eq!(first.children_fingerprint(), second.children_fingerprint()); + } + + #[test] + fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() { + let error = build_remote_dyn_filter_id(RegionId::new(1024, 7), usize::MAX, &[]) + .unwrap_err() + .to_string(); + + assert!(error.contains("producer ordinal out of range for filter id")); + } +} diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 161483c4c3..2d32ce16b3 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::hash::{DefaultHasher, Hasher}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -38,16 +37,11 @@ use datafusion::physical_plan::{ use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::{ - Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, -}; -use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec; -use datafusion_proto::physical_plan::to_proto::serialize_physical_expr; +use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; -use prost::Message; use session::context::QueryContextRef; use store_api::storage::RegionId; use table::table_name::TableName; @@ -500,42 +494,6 @@ impl MergeScanExec { } } -/// Builds the Phase 1 query-local remote dynamic filter identity. -/// -/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`. -/// Subscriber routing details such as `partition` stay outside this key so they can remain in -/// the later fanout/subscriber map instead of splitting one shared remote filter state. -#[allow(unused)] -pub(crate) fn build_remote_dyn_filter_id( - region_id: RegionId, - producer_local_ordinal: usize, - children: &[Arc], -) -> Result { - let children_fingerprint = canonicalize_dyn_filter_children(children)?; - Ok(format!( - "{region_id}:{producer_local_ordinal}:{children_fingerprint}" - )) -} - -fn canonicalize_dyn_filter_children(children: &[Arc]) -> Result { - let codec = DefaultPhysicalExtensionCodec {}; - let mut encoded_children = Vec::with_capacity(children.len()); - - for child in children { - let proto = serialize_physical_expr(child, &codec)?; - let mut bytes = Vec::new(); - proto - .encode(&mut bytes) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - let mut hasher = DefaultHasher::new(); - hasher.write_usize(bytes.len()); - hasher.write(&bytes); - encoded_children.push(format!("{:016x}", hasher.finish())); - } - - Ok(encoded_children.join(",")) -} - /// Metrics for a region of a partition. #[derive(Debug, Clone)] struct RegionMetrics { @@ -750,65 +708,3 @@ impl MergeScanMetric { self.greptime_exec_cost.add(metrics); } } - -#[cfg(test)] -mod tests { - use super::*; - - fn test_children(names: &[&str]) -> Vec> { - names - .iter() - .enumerate() - .map(|(index, name)| Arc::new(Column::new(name, index)) as Arc) - .collect() - } - - #[test] - fn remote_dyn_filter_id_is_stable_for_equivalent_children() { - let region_id = RegionId::new(1024, 7); - let first = - build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap(); - let second = - build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap(); - - assert_eq!(first, second); - } - - #[test] - fn remote_dyn_filter_id_changes_when_identity_inputs_change() { - let children = test_children(&["host", "pod"]); - let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap(); - let different_region = - build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap(); - let different_ordinal = - build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap(); - let different_children = - build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"])) - .unwrap(); - - assert_ne!(baseline, different_region); - assert_ne!(baseline, different_ordinal); - assert_ne!(baseline, different_children); - } - - #[test] - fn remote_dyn_filter_id_has_no_partition_dimension() { - let children = test_children(&["host"]); - let partition_zero = - build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap(); - let partition_one = - build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap(); - - assert_eq!(partition_zero, partition_one); - } - - #[test] - fn remote_dyn_filter_id_supports_empty_children() { - let region_id = RegionId::new(4096, 2); - let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap(); - let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap(); - - assert_eq!(first, second); - assert_eq!(first, format!("{region_id}:1:")); - } -} diff --git a/src/query/src/dist_plan/remote_dyn_filter_registry.rs b/src/query/src/dist_plan/remote_dyn_filter_registry.rs index b645597130..80788bff26 100644 --- a/src/query/src/dist_plan/remote_dyn_filter_registry.rs +++ b/src/query/src/dist_plan/remote_dyn_filter_registry.rs @@ -18,8 +18,11 @@ use std::sync::{Arc, RwLock}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use session::query_id::QueryId; use store_api::storage::RegionId; +use crate::dist_plan::FilterId; + /// Lifecycle state for a query-scoped remote dynamic filter registry. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RegistryState { @@ -72,7 +75,7 @@ pub enum SubscriberRegistration { // the real watcher/fanout loop lands. Some fields may move to query-level shared runtime state. #[derive(Debug)] pub struct DynFilterEntry { - filter_id: String, + filter_id: FilterId, alive_dyn_filter: Arc, last_epoch: AtomicU64, last_observed_generation: AtomicU64, @@ -86,20 +89,17 @@ pub struct DynFilterEntry { #[derive(Debug)] struct QueryDynFilterRegistryInner { state: RegistryState, - entries: HashMap>, + entries: HashMap>, } impl DynFilterEntry { - pub fn new( - filter_id: impl Into, - alive_dyn_filter: Arc, - ) -> Self { + pub fn new(filter_id: FilterId, alive_dyn_filter: Arc) -> Self { // TODO(remote-dyn-filter): When real watcher/update scheduling lands, confirm that seeding // the observed generation here is still the right initialization point. let last_observed_generation = alive_dyn_filter.snapshot_generation(); Self { - filter_id: filter_id.into(), + filter_id, alive_dyn_filter, last_epoch: AtomicU64::new(0), last_observed_generation: AtomicU64::new(last_observed_generation), @@ -108,7 +108,7 @@ impl DynFilterEntry { } } - pub fn filter_id(&self) -> &str { + pub fn filter_id(&self) -> &FilterId { &self.filter_id } @@ -174,14 +174,14 @@ impl DynFilterEntry { /// Query-scoped registry that owns all remote dynamic filters for one query. #[derive(Debug)] pub struct QueryDynFilterRegistry { - query_id: String, + query_id: QueryId, inner: RwLock, } impl QueryDynFilterRegistry { - pub fn new(query_id: impl Into) -> Self { + pub fn new(query_id: QueryId) -> Self { Self { - query_id: query_id.into(), + query_id, inner: RwLock::new(QueryDynFilterRegistryInner { state: RegistryState::Active, entries: HashMap::new(), @@ -189,8 +189,8 @@ impl QueryDynFilterRegistry { } } - pub fn query_id(&self) -> &str { - &self.query_id + pub fn query_id(&self) -> QueryId { + self.query_id } pub fn state(&self) -> RegistryState { @@ -211,13 +211,13 @@ impl QueryDynFilterRegistry { .collect() } - pub fn remote_dyn_filter(&self, filter_id: &str) -> Option> { + pub fn remote_dyn_filter(&self, filter_id: &FilterId) -> Option> { self.inner.read().unwrap().entries.get(filter_id).cloned() } pub fn register_remote_dyn_filter( &self, - filter_id: impl Into, + filter_id: FilterId, alive_dyn_filter: Arc, ) -> EntryRegistration { // TODO(remote-dyn-filter): Subtask 05 should call this from the MergeScan bridge after it @@ -227,7 +227,6 @@ impl QueryDynFilterRegistry { return EntryRegistration::RejectedByState(inner.state); } - let filter_id = filter_id.into(); if let Some(existing) = inner.entries.get(&filter_id) { return EntryRegistration::Existing(existing.clone()); } @@ -239,7 +238,7 @@ impl QueryDynFilterRegistry { pub fn register_subscriber( &self, - filter_id: &str, + filter_id: &FilterId, subscriber: Subscriber, ) -> SubscriberRegistration { // TODO(remote-dyn-filter): Later subtasks should route remote subscriber metadata into @@ -301,27 +300,26 @@ impl QueryDynFilterRegistry { /// Query-engine manager for query-scoped remote dynamic filter registries. #[derive(Debug, Default)] pub struct DynFilterRegistryManager { - registries: RwLock>>, + registries: RwLock>>, } impl DynFilterRegistryManager { - pub fn get(&self, query_id: &str) -> Option> { + pub fn get(&self, query_id: &QueryId) -> Option> { self.registries.read().unwrap().get(query_id).cloned() } - pub fn get_or_init(&self, query_id: impl Into) -> Arc { + pub fn get_or_init(&self, query_id: QueryId) -> Arc { // TODO(remote-dyn-filter): Subtask 04 should wire query-engine runtime ownership through // this entry point so query_id-scoped registries live with distributed query execution. - let query_id = query_id.into(); let mut registries = self.registries.write().unwrap(); registries - .entry(query_id.clone()) + .entry(query_id) .or_insert_with(|| Arc::new(QueryDynFilterRegistry::new(query_id))) .clone() } - pub fn begin_closing(&self, query_id: &str) -> Option> { + pub fn begin_closing(&self, query_id: &QueryId) -> Option> { // TODO(remote-dyn-filter): Query finish/cancel hooks should call this to start the cleanup // tail, not remove the registry immediately. let registry = self.get(query_id)?; @@ -329,7 +327,7 @@ impl DynFilterRegistryManager { Some(registry) } - pub fn reap_closed(&self, query_id: &str) -> bool { + pub fn reap_closed(&self, query_id: &QueryId) -> bool { // TODO(remote-dyn-filter): Cleanup code should call this only after mark_closed(). If a // later implementation needs a retained closed-tail window, expand here instead of adding // ad-hoc removal at call sites. @@ -353,8 +351,18 @@ impl DynFilterRegistryManager { #[cfg(test)] mod tests { use datafusion_physical_expr::expressions::{Column, lit}; + use uuid::Uuid; use super::*; + use crate::dist_plan::FilterFingerprint; + + fn test_query_id(value: u128) -> QueryId { + QueryId::from(Uuid::from_u128(value)) + } + + fn test_filter_id(region_id: RegionId, producer_ordinal: u32) -> FilterId { + FilterId::new(region_id, producer_ordinal, FilterFingerprint::new(0xabc)) + } fn test_dyn_filter(names: &[&str]) -> Arc { let children = names @@ -369,8 +377,9 @@ mod tests { #[test] fn registry_manager_returns_same_registry_for_same_query() { let manager = DynFilterRegistryManager::default(); - let first = manager.get_or_init("query-1"); - let second = manager.get_or_init("query-1"); + let query_id = test_query_id(1); + let first = manager.get_or_init(query_id); + let second = manager.get_or_init(query_id); assert!(Arc::ptr_eq(&first, &second)); assert_eq!(manager.registry_count(), 1); @@ -378,14 +387,15 @@ mod tests { #[test] fn registry_stores_filter_and_deduplicates_subscribers() { - let registry = QueryDynFilterRegistry::new("query-1"); + let registry = QueryDynFilterRegistry::new(test_query_id(1)); let filter = test_dyn_filter(&["host"]); - let entry = match registry.register_remote_dyn_filter("filter-1", filter.clone()) { + let filter_id = test_filter_id(RegionId::new(1024, 7), 1); + let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) { EntryRegistration::Inserted(entry) => entry, other => panic!("unexpected registration result: {other:?}"), }; - assert_eq!(entry.filter_id(), "filter-1"); + assert_eq!(entry.filter_id(), &filter_id); assert_eq!( entry.last_observed_generation(), filter.snapshot_generation() @@ -394,11 +404,11 @@ mod tests { let subscriber = Subscriber::new(RegionId::new(1024, 1)); assert_eq!( - registry.register_subscriber("filter-1", subscriber.clone()), + registry.register_subscriber(&filter_id, subscriber.clone()), SubscriberRegistration::Added ); assert_eq!( - registry.register_subscriber("filter-1", subscriber), + registry.register_subscriber(&filter_id, subscriber), SubscriberRegistration::Duplicate ); assert_eq!(entry.subscribers().len(), 1); @@ -406,15 +416,16 @@ mod tests { #[test] fn registry_lifecycle_rejects_new_work_after_closing() { - let registry = QueryDynFilterRegistry::new("query-1"); + let registry = QueryDynFilterRegistry::new(test_query_id(1)); assert_eq!(registry.state(), RegistryState::Active); assert_eq!(registry.begin_closing(), RegistryState::Closing); assert_eq!(registry.state(), RegistryState::Closing); let filter = test_dyn_filter(&["host"]); + let filter_id = test_filter_id(RegionId::new(1024, 7), 1); assert!(matches!( - registry.register_remote_dyn_filter("filter-1", filter), + registry.register_remote_dyn_filter(filter_id, filter), EntryRegistration::RejectedByState(RegistryState::Closing) )); @@ -424,7 +435,10 @@ mod tests { #[test] fn registered_filter_starts_watcher_once() { - let entry = DynFilterEntry::new("filter-1", test_dyn_filter(&["host"])); + let entry = DynFilterEntry::new( + test_filter_id(RegionId::new(1024, 7), 1), + test_dyn_filter(&["host"]), + ); assert!(entry.start_watcher_if_needed()); assert!(entry.watcher_started()); @@ -437,13 +451,17 @@ mod tests { #[test] fn manager_reaps_closed_registry() { let manager = DynFilterRegistryManager::default(); - let registry = manager.get_or_init("query-1"); - let _ = registry.register_remote_dyn_filter("filter-1", test_dyn_filter(&["host"])); + let query_id = test_query_id(1); + let registry = manager.get_or_init(query_id); + let _ = registry.register_remote_dyn_filter( + test_filter_id(RegionId::new(1024, 7), 1), + test_dyn_filter(&["host"]), + ); registry.mark_closed(); - assert!(manager.reap_closed("query-1")); + assert!(manager.reap_closed(&query_id)); assert_eq!(manager.registry_count(), 0); - assert!(manager.get("query-1").is_none()); + assert!(manager.get(&query_id).is_none()); } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f696c8b53e..ff45ecab55 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -50,12 +50,14 @@ use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites; use datafusion_optimizer::optimizer::Optimizer; use partition::manager::PartitionRuleManagerRef; use promql::extension_plan::PromExtensionPlanner; +use session::context::QueryContextRef; use table::TableRef; use table::table::adapter::DfTableProviderAdapter; use crate::QueryEngineContext; use crate::dist_plan::{ - DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner, + DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager, + MergeSortExtensionPlanner, QueryDynFilterRegistry, }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; @@ -81,6 +83,7 @@ use crate::region_query::RegionQueryHandlerRef; pub struct QueryEngineState { df_context: SessionContext, catalog_manager: CatalogManagerRef, + dyn_filter_registry_manager: Arc, function_state: Arc, scalar_functions: Arc>>, aggr_functions: Arc>>, @@ -231,6 +234,7 @@ impl QueryEngineState { Self { df_context, catalog_manager: catalog_list, + dyn_filter_registry_manager: Arc::new(DynFilterRegistryManager::default()), function_state: Arc::new(FunctionState { table_mutation_handler, procedure_service_handler, @@ -387,6 +391,37 @@ impl QueryEngineState { &self.catalog_manager } + pub fn dyn_filter_registry_manager(&self) -> Arc { + self.dyn_filter_registry_manager.clone() + } + + pub fn get_or_init_remote_dyn_filter_registry( + &self, + query_ctx: &QueryContextRef, + ) -> Option> { + let query_id = query_ctx.remote_query_id_value()?; + Some(self.dyn_filter_registry_manager.get_or_init(query_id)) + } + + pub fn begin_closing_remote_dyn_filter_registry( + &self, + query_ctx: &QueryContextRef, + ) -> Option> { + // TODO(remote-dyn-filter): Wire query finish/cancel hooks through this helper once the + // distributed query lifecycle exposes a single cleanup callback for the query runtime. + let query_id = query_ctx.remote_query_id_value()?; + self.dyn_filter_registry_manager.begin_closing(&query_id) + } + + pub fn reap_closed_remote_dyn_filter_registry(&self, query_ctx: &QueryContextRef) -> bool { + // TODO(remote-dyn-filter): Call this after the Closing cleanup tail marks the registry + // closed. Subtask 04 only exposes the runtime ownership path; later subtasks should decide + // the exact lifecycle hook that invokes this. + query_ctx + .remote_query_id_value() + .is_some_and(|query_id| self.dyn_filter_registry_manager.reap_closed(&query_id)) + } + pub fn function_state(&self) -> Arc { self.function_state.clone() } @@ -568,3 +603,110 @@ impl MemoryPool for MetricsMemoryPool { self.inner.memory_limit() } } + +#[cfg(test)] +mod tests { + use common_base::Plugins; + use session::context::QueryContext; + + use super::*; + use crate::options::QueryOptions; + + fn new_query_engine_state() -> QueryEngineState { + QueryEngineState::new( + catalog::memory::new_memory_catalog_manager().unwrap(), + None, + None, + None, + None, + None, + false, + Plugins::default(), + QueryOptions::default(), + ) + } + + #[test] + fn query_engine_state_reuses_query_scoped_dyn_filter_registry() { + let state = new_query_engine_state(); + let query_ctx = QueryContext::arc(); + + let first = state + .get_or_init_remote_dyn_filter_registry(&query_ctx) + .unwrap(); + let second = state + .get_or_init_remote_dyn_filter_registry(&query_ctx) + .unwrap(); + + assert!(Arc::ptr_eq(&first, &second)); + assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1); + assert_eq!(first.query_id(), query_ctx.remote_query_id_value().unwrap()); + } + + #[test] + fn query_engine_state_exposes_closing_and_reap_helpers() { + let state = new_query_engine_state(); + let query_ctx = QueryContext::arc(); + let registry = state + .get_or_init_remote_dyn_filter_registry(&query_ctx) + .unwrap(); + + let closing = state + .begin_closing_remote_dyn_filter_registry(&query_ctx) + .unwrap(); + assert!(Arc::ptr_eq(®istry, &closing)); + assert_eq!(closing.state(), crate::dist_plan::RegistryState::Closing); + + closing.mark_closed(); + assert!(state.reap_closed_remote_dyn_filter_registry(&query_ctx)); + assert!( + state + .dyn_filter_registry_manager() + .get(&query_ctx.remote_query_id_value().unwrap()) + .is_none() + ); + } + + #[test] + fn query_engine_state_relies_on_query_context_remote_query_id_contract() { + let state = new_query_engine_state(); + let query_ctx = QueryContext::arc(); + + assert!(query_ctx.remote_query_id_value().is_some()); + + let registry = state + .get_or_init_remote_dyn_filter_registry(&query_ctx) + .unwrap(); + + assert_eq!( + registry.query_id(), + query_ctx.remote_query_id_value().unwrap() + ); + assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1); + } + + #[test] + fn query_engine_state_separates_registries_for_different_query_contexts() { + let state = new_query_engine_state(); + let first_query_ctx = QueryContext::arc(); + let second_query_ctx = QueryContext::arc(); + + let first = state + .get_or_init_remote_dyn_filter_registry(&first_query_ctx) + .unwrap(); + let second = state + .get_or_init_remote_dyn_filter_registry(&second_query_ctx) + .unwrap(); + + assert!(!Arc::ptr_eq(&first, &second)); + assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2); + assert_eq!( + first.query_id(), + first_query_ctx.remote_query_id_value().unwrap() + ); + assert_eq!( + second.query_id(), + second_query_ctx.remote_query_id_value().unwrap() + ); + } +} diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 7c6350dee1..756899b59e 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -31,9 +31,9 @@ use common_time::timezone::parse_timezone; use datafusion_common::config::ConfigOptions; use derive_builder::Builder; use sql::dialect::{Dialect, GenericDialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect}; -use uuid::Uuid; use crate::protocol_ctx::ProtocolCtx; +use crate::query_id::QueryId; use crate::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle, PGIntervalStyle}; use crate::{MutableInner, ReadPreference}; @@ -44,7 +44,11 @@ const CURSOR_COUNT_WARNING_LIMIT: usize = 10; pub const REMOTE_QUERY_ID_EXTENSION_KEY: &str = "remote_query_id"; pub fn generate_remote_query_id() -> String { - Uuid::now_v7().to_string() + generate_remote_query_id_value().to_string() +} + +pub fn generate_remote_query_id_value() -> QueryId { + QueryId::new() } #[derive(Debug, Builder, Clone)] @@ -354,6 +358,11 @@ impl QueryContext { self.extension(REMOTE_QUERY_ID_EXTENSION_KEY) } + pub fn remote_query_id_value(&self) -> Option { + self.remote_query_id() + .and_then(|query_id| query_id.parse().ok()) + } + pub fn extensions(&self) -> HashMap { self.extensions.clone() } @@ -798,9 +807,14 @@ mod test { .build(); assert_eq!(ctx.remote_query_id(), Some(query_id)); + assert_eq!(ctx.remote_query_id_value().unwrap().to_string(), query_id); let proto: api::v1::QueryContext = (&ctx).into(); let restored = QueryContext::from(proto); assert_eq!(restored.remote_query_id(), Some(query_id)); + assert_eq!( + restored.remote_query_id_value().unwrap().to_string(), + query_id + ); } } diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index 1294a3368f..cba78a060e 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -15,6 +15,7 @@ pub mod context; pub mod hints; pub mod protocol_ctx; +pub mod query_id; pub mod session_config; pub mod table_name; diff --git a/src/session/src/query_id.rs b/src/session/src/query_id.rs new file mode 100644 index 0000000000..220e52577e --- /dev/null +++ b/src/session/src/query_id.rs @@ -0,0 +1,76 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use uuid::Uuid; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct QueryId(Uuid); + +impl QueryId { + pub fn new() -> Self { + Self(Uuid::now_v7()) + } + + pub fn as_uuid(&self) -> &Uuid { + &self.0 + } +} + +impl Default for QueryId { + fn default() -> Self { + Self::new() + } +} + +impl Display for QueryId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl FromStr for QueryId { + type Err = uuid::Error; + + fn from_str(s: &str) -> Result { + Ok(Self(Uuid::parse_str(s)?)) + } +} + +impl From for QueryId { + fn from(value: Uuid) -> Self { + Self(value) + } +} + +impl From for Uuid { + fn from(value: QueryId) -> Self { + value.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn query_id_round_trips_through_string() { + let query_id = QueryId::new(); + let encoded = query_id.to_string(); + + assert_eq!(encoded.parse::().unwrap(), query_id); + } +}