feat: filter id&refactor to type

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-15 15:06:09 +08:00
parent ff53155687
commit 18fbd3054a
8 changed files with 562 additions and 146 deletions

View File

@@ -14,6 +14,7 @@
mod analyzer;
mod commutativity;
mod filter_id;
mod merge_scan;
mod merge_sort;
mod planner;
@@ -22,6 +23,7 @@ mod region_pruner;
mod remote_dyn_filter_registry;
pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions};
pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError};
pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan};
pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner};
pub use predicate_extractor::PredicateExtractor;

View File

@@ -0,0 +1,267 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hasher};
use std::num::ParseIntError;
use std::str::FromStr;
use std::sync::Arc;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use prost::Message;
use store_api::storage::RegionId;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FilterFingerprint(u64);
impl FilterFingerprint {
pub fn new(value: u64) -> Self {
Self(value)
}
pub fn get(self) -> u64 {
self.0
}
}
impl Display for FilterFingerprint {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:016x}", self.0)
}
}
impl FromStr for FilterFingerprint {
type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(u64::from_str_radix(s, 16)?))
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct FilterId {
region_id: RegionId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
}
// NOTE(remote-dyn-filter): FilterId is generated once by the source-side planner/runtime and then
// propagated through the query lifecycle. Consumers should treat it as the canonical propagated
// identifier instead of independently recomputing it from local state.
impl FilterId {
pub fn new(
region_id: RegionId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
) -> Self {
Self {
region_id,
producer_ordinal,
children_fingerprint,
}
}
pub fn region_id(&self) -> RegionId {
self.region_id
}
pub fn producer_ordinal(&self) -> u32 {
self.producer_ordinal
}
pub fn children_fingerprint(&self) -> FilterFingerprint {
self.children_fingerprint
}
}
impl Display for FilterId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}:{}",
self.region_id.as_u64(),
self.producer_ordinal,
self.children_fingerprint
)
}
}
impl FromStr for FilterId {
type Err = ParseFilterIdError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut parts = s.split(':');
let region_id = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<u64>()
.map(RegionId::from_u64)
.map_err(|_| ParseFilterIdError)?;
let producer_ordinal = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<u32>()
.map_err(|_| ParseFilterIdError)?;
let children_fingerprint = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<FilterFingerprint>()
.map_err(|_| ParseFilterIdError)?;
if parts.next().is_some() {
return Err(ParseFilterIdError);
}
Ok(Self::new(region_id, producer_ordinal, children_fingerprint))
}
}
#[derive(Debug)]
pub struct ParseFilterIdError;
impl Display for ParseFilterIdError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "invalid filter id")
}
}
/// Builds the query-local remote dynamic filter identity.
///
/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `partition` stay outside this key so they can remain in
/// the later fanout/subscriber map instead of splitting one shared remote filter state.
///
/// NOTE(remote-dyn-filter): This id is generated once on the source side and then propagated.
/// Consumers should reuse the propagated `FilterId` instead of independently recomputing it from
/// local state.
#[allow(unused)]
pub(crate) fn build_remote_dyn_filter_id(
region_id: RegionId,
producer_local_ordinal: usize,
children: &[Arc<dyn PhysicalExpr>],
) -> Result<FilterId> {
let children_fingerprint = canonicalize_dyn_filter_children(children)?;
let producer_local_ordinal = u32::try_from(producer_local_ordinal).map_err(|err| {
let _ = err;
DataFusionError::Execution("producer ordinal out of range for filter id".to_string())
})?;
Ok(FilterId::new(
region_id,
producer_local_ordinal,
children_fingerprint,
))
}
fn canonicalize_dyn_filter_children(
children: &[Arc<dyn PhysicalExpr>],
) -> Result<FilterFingerprint> {
let codec = DefaultPhysicalExtensionCodec {};
let mut hasher = DefaultHasher::new();
hasher.write_usize(children.len());
for child in children {
let proto = serialize_physical_expr(child, &codec)?;
let mut bytes = Vec::new();
proto
.encode(&mut bytes)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
hasher.write_usize(bytes.len());
hasher.write(&bytes);
}
Ok(FilterFingerprint::new(hasher.finish()))
}
#[cfg(test)]
mod tests {
use datafusion_physical_expr::expressions::Column;
use super::*;
fn test_children(names: &[&str]) -> Vec<Arc<dyn PhysicalExpr>> {
names
.iter()
.enumerate()
.map(|(index, name)| Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>)
.collect()
}
#[test]
fn filter_id_round_trips_through_string() {
let filter_id = FilterId::new(RegionId::new(1024, 7), 3, FilterFingerprint::new(0xabc));
let encoded = filter_id.to_string();
assert_eq!(encoded.parse::<FilterId>().unwrap(), filter_id);
}
#[test]
fn filter_id_rejects_malformed_strings() {
assert!("".parse::<FilterId>().is_err());
assert!("1024:3".parse::<FilterId>().is_err());
assert!("1024:3:zzzz".parse::<FilterId>().is_err());
assert!("1024:3:0000000000000abc:extra".parse::<FilterId>().is_err());
}
#[test]
fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
let region_id = RegionId::new(1024, 7);
let first =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
let second =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
assert_eq!(first, second);
}
#[test]
fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap();
let different_region =
build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap();
let different_ordinal =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap();
let different_children =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"]))
.unwrap();
assert_ne!(baseline, different_region);
assert_ne!(baseline, different_ordinal);
assert_ne!(baseline, different_children);
}
#[test]
fn remote_dyn_filter_id_supports_empty_children() {
let region_id = RegionId::new(4096, 2);
let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
assert_eq!(first, second);
assert_eq!(first.region_id(), region_id);
assert_eq!(first.producer_ordinal(), 1);
assert_eq!(first.children_fingerprint(), second.children_fingerprint());
}
#[test]
fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() {
let error = build_remote_dyn_filter_id(RegionId::new(1024, 7), usize::MAX, &[])
.unwrap_err()
.to_string();
assert!(error.contains("producer ordinal out of range for filter id"));
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::hash::{DefaultHasher, Hasher};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -38,16 +37,11 @@ use datafusion::physical_plan::{
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
};
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use prost::Message;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::table_name::TableName;
@@ -500,42 +494,6 @@ impl MergeScanExec {
}
}
/// Builds the Phase 1 query-local remote dynamic filter identity.
///
/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `partition` stay outside this key so they can remain in
/// the later fanout/subscriber map instead of splitting one shared remote filter state.
#[allow(unused)]
pub(crate) fn build_remote_dyn_filter_id(
region_id: RegionId,
producer_local_ordinal: usize,
children: &[Arc<dyn PhysicalExpr>],
) -> Result<String> {
let children_fingerprint = canonicalize_dyn_filter_children(children)?;
Ok(format!(
"{region_id}:{producer_local_ordinal}:{children_fingerprint}"
))
}
fn canonicalize_dyn_filter_children(children: &[Arc<dyn PhysicalExpr>]) -> Result<String> {
let codec = DefaultPhysicalExtensionCodec {};
let mut encoded_children = Vec::with_capacity(children.len());
for child in children {
let proto = serialize_physical_expr(child, &codec)?;
let mut bytes = Vec::new();
proto
.encode(&mut bytes)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut hasher = DefaultHasher::new();
hasher.write_usize(bytes.len());
hasher.write(&bytes);
encoded_children.push(format!("{:016x}", hasher.finish()));
}
Ok(encoded_children.join(","))
}
/// Metrics for a region of a partition.
#[derive(Debug, Clone)]
struct RegionMetrics {
@@ -750,65 +708,3 @@ impl MergeScanMetric {
self.greptime_exec_cost.add(metrics);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_children(names: &[&str]) -> Vec<Arc<dyn PhysicalExpr>> {
names
.iter()
.enumerate()
.map(|(index, name)| Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>)
.collect()
}
#[test]
fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
let region_id = RegionId::new(1024, 7);
let first =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
let second =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
assert_eq!(first, second);
}
#[test]
fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap();
let different_region =
build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap();
let different_ordinal =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap();
let different_children =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"]))
.unwrap();
assert_ne!(baseline, different_region);
assert_ne!(baseline, different_ordinal);
assert_ne!(baseline, different_children);
}
#[test]
fn remote_dyn_filter_id_has_no_partition_dimension() {
let children = test_children(&["host"]);
let partition_zero =
build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap();
let partition_one =
build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap();
assert_eq!(partition_zero, partition_one);
}
#[test]
fn remote_dyn_filter_id_supports_empty_children() {
let region_id = RegionId::new(4096, 2);
let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
assert_eq!(first, second);
assert_eq!(first, format!("{region_id}:1:"));
}
}

View File

@@ -18,8 +18,11 @@ use std::sync::{Arc, RwLock};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use session::query_id::QueryId;
use store_api::storage::RegionId;
use crate::dist_plan::FilterId;
/// Lifecycle state for a query-scoped remote dynamic filter registry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegistryState {
@@ -72,7 +75,7 @@ pub enum SubscriberRegistration {
// the real watcher/fanout loop lands. Some fields may move to query-level shared runtime state.
#[derive(Debug)]
pub struct DynFilterEntry {
filter_id: String,
filter_id: FilterId,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
last_epoch: AtomicU64,
last_observed_generation: AtomicU64,
@@ -86,20 +89,17 @@ pub struct DynFilterEntry {
#[derive(Debug)]
struct QueryDynFilterRegistryInner {
state: RegistryState,
entries: HashMap<String, Arc<DynFilterEntry>>,
entries: HashMap<FilterId, Arc<DynFilterEntry>>,
}
impl DynFilterEntry {
pub fn new(
filter_id: impl Into<String>,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
) -> Self {
pub fn new(filter_id: FilterId, alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
// TODO(remote-dyn-filter): When real watcher/update scheduling lands, confirm that seeding
// the observed generation here is still the right initialization point.
let last_observed_generation = alive_dyn_filter.snapshot_generation();
Self {
filter_id: filter_id.into(),
filter_id,
alive_dyn_filter,
last_epoch: AtomicU64::new(0),
last_observed_generation: AtomicU64::new(last_observed_generation),
@@ -108,7 +108,7 @@ impl DynFilterEntry {
}
}
pub fn filter_id(&self) -> &str {
pub fn filter_id(&self) -> &FilterId {
&self.filter_id
}
@@ -174,14 +174,14 @@ impl DynFilterEntry {
/// Query-scoped registry that owns all remote dynamic filters for one query.
#[derive(Debug)]
pub struct QueryDynFilterRegistry {
query_id: String,
query_id: QueryId,
inner: RwLock<QueryDynFilterRegistryInner>,
}
impl QueryDynFilterRegistry {
pub fn new(query_id: impl Into<String>) -> Self {
pub fn new(query_id: QueryId) -> Self {
Self {
query_id: query_id.into(),
query_id,
inner: RwLock::new(QueryDynFilterRegistryInner {
state: RegistryState::Active,
entries: HashMap::new(),
@@ -189,8 +189,8 @@ impl QueryDynFilterRegistry {
}
}
pub fn query_id(&self) -> &str {
&self.query_id
pub fn query_id(&self) -> QueryId {
self.query_id
}
pub fn state(&self) -> RegistryState {
@@ -211,13 +211,13 @@ impl QueryDynFilterRegistry {
.collect()
}
pub fn remote_dyn_filter(&self, filter_id: &str) -> Option<Arc<DynFilterEntry>> {
pub fn remote_dyn_filter(&self, filter_id: &FilterId) -> Option<Arc<DynFilterEntry>> {
self.inner.read().unwrap().entries.get(filter_id).cloned()
}
pub fn register_remote_dyn_filter(
&self,
filter_id: impl Into<String>,
filter_id: FilterId,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
) -> EntryRegistration {
// TODO(remote-dyn-filter): Subtask 05 should call this from the MergeScan bridge after it
@@ -227,7 +227,6 @@ impl QueryDynFilterRegistry {
return EntryRegistration::RejectedByState(inner.state);
}
let filter_id = filter_id.into();
if let Some(existing) = inner.entries.get(&filter_id) {
return EntryRegistration::Existing(existing.clone());
}
@@ -239,7 +238,7 @@ impl QueryDynFilterRegistry {
pub fn register_subscriber(
&self,
filter_id: &str,
filter_id: &FilterId,
subscriber: Subscriber,
) -> SubscriberRegistration {
// TODO(remote-dyn-filter): Later subtasks should route remote subscriber metadata into
@@ -301,27 +300,26 @@ impl QueryDynFilterRegistry {
/// Query-engine manager for query-scoped remote dynamic filter registries.
#[derive(Debug, Default)]
pub struct DynFilterRegistryManager {
registries: RwLock<HashMap<String, Arc<QueryDynFilterRegistry>>>,
registries: RwLock<HashMap<QueryId, Arc<QueryDynFilterRegistry>>>,
}
impl DynFilterRegistryManager {
pub fn get(&self, query_id: &str) -> Option<Arc<QueryDynFilterRegistry>> {
pub fn get(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
self.registries.read().unwrap().get(query_id).cloned()
}
pub fn get_or_init(&self, query_id: impl Into<String>) -> Arc<QueryDynFilterRegistry> {
pub fn get_or_init(&self, query_id: QueryId) -> Arc<QueryDynFilterRegistry> {
// TODO(remote-dyn-filter): Subtask 04 should wire query-engine runtime ownership through
// this entry point so query_id-scoped registries live with distributed query execution.
let query_id = query_id.into();
let mut registries = self.registries.write().unwrap();
registries
.entry(query_id.clone())
.entry(query_id)
.or_insert_with(|| Arc::new(QueryDynFilterRegistry::new(query_id)))
.clone()
}
pub fn begin_closing(&self, query_id: &str) -> Option<Arc<QueryDynFilterRegistry>> {
pub fn begin_closing(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
// TODO(remote-dyn-filter): Query finish/cancel hooks should call this to start the cleanup
// tail, not remove the registry immediately.
let registry = self.get(query_id)?;
@@ -329,7 +327,7 @@ impl DynFilterRegistryManager {
Some(registry)
}
pub fn reap_closed(&self, query_id: &str) -> bool {
pub fn reap_closed(&self, query_id: &QueryId) -> bool {
// TODO(remote-dyn-filter): Cleanup code should call this only after mark_closed(). If a
// later implementation needs a retained closed-tail window, expand here instead of adding
// ad-hoc removal at call sites.
@@ -353,8 +351,18 @@ impl DynFilterRegistryManager {
#[cfg(test)]
mod tests {
use datafusion_physical_expr::expressions::{Column, lit};
use uuid::Uuid;
use super::*;
use crate::dist_plan::FilterFingerprint;
fn test_query_id(value: u128) -> QueryId {
QueryId::from(Uuid::from_u128(value))
}
fn test_filter_id(region_id: RegionId, producer_ordinal: u32) -> FilterId {
FilterId::new(region_id, producer_ordinal, FilterFingerprint::new(0xabc))
}
fn test_dyn_filter(names: &[&str]) -> Arc<DynamicFilterPhysicalExpr> {
let children = names
@@ -369,8 +377,9 @@ mod tests {
#[test]
fn registry_manager_returns_same_registry_for_same_query() {
let manager = DynFilterRegistryManager::default();
let first = manager.get_or_init("query-1");
let second = manager.get_or_init("query-1");
let query_id = test_query_id(1);
let first = manager.get_or_init(query_id);
let second = manager.get_or_init(query_id);
assert!(Arc::ptr_eq(&first, &second));
assert_eq!(manager.registry_count(), 1);
@@ -378,14 +387,15 @@ mod tests {
#[test]
fn registry_stores_filter_and_deduplicates_subscribers() {
let registry = QueryDynFilterRegistry::new("query-1");
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let filter = test_dyn_filter(&["host"]);
let entry = match registry.register_remote_dyn_filter("filter-1", filter.clone()) {
let filter_id = test_filter_id(RegionId::new(1024, 7), 1);
let entry = match registry.register_remote_dyn_filter(filter_id.clone(), filter.clone()) {
EntryRegistration::Inserted(entry) => entry,
other => panic!("unexpected registration result: {other:?}"),
};
assert_eq!(entry.filter_id(), "filter-1");
assert_eq!(entry.filter_id(), &filter_id);
assert_eq!(
entry.last_observed_generation(),
filter.snapshot_generation()
@@ -394,11 +404,11 @@ mod tests {
let subscriber = Subscriber::new(RegionId::new(1024, 1));
assert_eq!(
registry.register_subscriber("filter-1", subscriber.clone()),
registry.register_subscriber(&filter_id, subscriber.clone()),
SubscriberRegistration::Added
);
assert_eq!(
registry.register_subscriber("filter-1", subscriber),
registry.register_subscriber(&filter_id, subscriber),
SubscriberRegistration::Duplicate
);
assert_eq!(entry.subscribers().len(), 1);
@@ -406,15 +416,16 @@ mod tests {
#[test]
fn registry_lifecycle_rejects_new_work_after_closing() {
let registry = QueryDynFilterRegistry::new("query-1");
let registry = QueryDynFilterRegistry::new(test_query_id(1));
assert_eq!(registry.state(), RegistryState::Active);
assert_eq!(registry.begin_closing(), RegistryState::Closing);
assert_eq!(registry.state(), RegistryState::Closing);
let filter = test_dyn_filter(&["host"]);
let filter_id = test_filter_id(RegionId::new(1024, 7), 1);
assert!(matches!(
registry.register_remote_dyn_filter("filter-1", filter),
registry.register_remote_dyn_filter(filter_id, filter),
EntryRegistration::RejectedByState(RegistryState::Closing)
));
@@ -424,7 +435,10 @@ mod tests {
#[test]
fn registered_filter_starts_watcher_once() {
let entry = DynFilterEntry::new("filter-1", test_dyn_filter(&["host"]));
let entry = DynFilterEntry::new(
test_filter_id(RegionId::new(1024, 7), 1),
test_dyn_filter(&["host"]),
);
assert!(entry.start_watcher_if_needed());
assert!(entry.watcher_started());
@@ -437,13 +451,17 @@ mod tests {
#[test]
fn manager_reaps_closed_registry() {
let manager = DynFilterRegistryManager::default();
let registry = manager.get_or_init("query-1");
let _ = registry.register_remote_dyn_filter("filter-1", test_dyn_filter(&["host"]));
let query_id = test_query_id(1);
let registry = manager.get_or_init(query_id);
let _ = registry.register_remote_dyn_filter(
test_filter_id(RegionId::new(1024, 7), 1),
test_dyn_filter(&["host"]),
);
registry.mark_closed();
assert!(manager.reap_closed("query-1"));
assert!(manager.reap_closed(&query_id));
assert_eq!(manager.registry_count(), 0);
assert!(manager.get("query-1").is_none());
assert!(manager.get(&query_id).is_none());
}
}

View File

@@ -50,12 +50,14 @@ use datafusion_optimizer::analyzer::function_rewrite::ApplyFunctionRewrites;
use datafusion_optimizer::optimizer::Optimizer;
use partition::manager::PartitionRuleManagerRef;
use promql::extension_plan::PromExtensionPlanner;
use session::context::QueryContextRef;
use table::TableRef;
use table::table::adapter::DfTableProviderAdapter;
use crate::QueryEngineContext;
use crate::dist_plan::{
DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, DynFilterRegistryManager,
MergeSortExtensionPlanner, QueryDynFilterRegistry,
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
@@ -81,6 +83,7 @@ use crate::region_query::RegionQueryHandlerRef;
pub struct QueryEngineState {
df_context: SessionContext,
catalog_manager: CatalogManagerRef,
dyn_filter_registry_manager: Arc<DynFilterRegistryManager>,
function_state: Arc<FunctionState>,
scalar_functions: Arc<RwLock<HashMap<String, ScalarFunctionFactory>>>,
aggr_functions: Arc<RwLock<HashMap<String, AggregateUDF>>>,
@@ -231,6 +234,7 @@ impl QueryEngineState {
Self {
df_context,
catalog_manager: catalog_list,
dyn_filter_registry_manager: Arc::new(DynFilterRegistryManager::default()),
function_state: Arc::new(FunctionState {
table_mutation_handler,
procedure_service_handler,
@@ -387,6 +391,37 @@ impl QueryEngineState {
&self.catalog_manager
}
pub fn dyn_filter_registry_manager(&self) -> Arc<DynFilterRegistryManager> {
self.dyn_filter_registry_manager.clone()
}
pub fn get_or_init_remote_dyn_filter_registry(
&self,
query_ctx: &QueryContextRef,
) -> Option<Arc<QueryDynFilterRegistry>> {
let query_id = query_ctx.remote_query_id_value()?;
Some(self.dyn_filter_registry_manager.get_or_init(query_id))
}
pub fn begin_closing_remote_dyn_filter_registry(
&self,
query_ctx: &QueryContextRef,
) -> Option<Arc<QueryDynFilterRegistry>> {
// TODO(remote-dyn-filter): Wire query finish/cancel hooks through this helper once the
// distributed query lifecycle exposes a single cleanup callback for the query runtime.
let query_id = query_ctx.remote_query_id_value()?;
self.dyn_filter_registry_manager.begin_closing(&query_id)
}
pub fn reap_closed_remote_dyn_filter_registry(&self, query_ctx: &QueryContextRef) -> bool {
// TODO(remote-dyn-filter): Call this after the Closing cleanup tail marks the registry
// closed. Subtask 04 only exposes the runtime ownership path; later subtasks should decide
// the exact lifecycle hook that invokes this.
query_ctx
.remote_query_id_value()
.is_some_and(|query_id| self.dyn_filter_registry_manager.reap_closed(&query_id))
}
pub fn function_state(&self) -> Arc<FunctionState> {
self.function_state.clone()
}
@@ -568,3 +603,110 @@ impl MemoryPool for MetricsMemoryPool {
self.inner.memory_limit()
}
}
#[cfg(test)]
mod tests {
use common_base::Plugins;
use session::context::QueryContext;
use super::*;
use crate::options::QueryOptions;
fn new_query_engine_state() -> QueryEngineState {
QueryEngineState::new(
catalog::memory::new_memory_catalog_manager().unwrap(),
None,
None,
None,
None,
None,
false,
Plugins::default(),
QueryOptions::default(),
)
}
#[test]
fn query_engine_state_reuses_query_scoped_dyn_filter_registry() {
let state = new_query_engine_state();
let query_ctx = QueryContext::arc();
let first = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.unwrap();
let second = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.unwrap();
assert!(Arc::ptr_eq(&first, &second));
assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
assert_eq!(first.query_id(), query_ctx.remote_query_id_value().unwrap());
}
#[test]
fn query_engine_state_exposes_closing_and_reap_helpers() {
let state = new_query_engine_state();
let query_ctx = QueryContext::arc();
let registry = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.unwrap();
let closing = state
.begin_closing_remote_dyn_filter_registry(&query_ctx)
.unwrap();
assert!(Arc::ptr_eq(&registry, &closing));
assert_eq!(closing.state(), crate::dist_plan::RegistryState::Closing);
closing.mark_closed();
assert!(state.reap_closed_remote_dyn_filter_registry(&query_ctx));
assert!(
state
.dyn_filter_registry_manager()
.get(&query_ctx.remote_query_id_value().unwrap())
.is_none()
);
}
#[test]
fn query_engine_state_relies_on_query_context_remote_query_id_contract() {
let state = new_query_engine_state();
let query_ctx = QueryContext::arc();
assert!(query_ctx.remote_query_id_value().is_some());
let registry = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.unwrap();
assert_eq!(
registry.query_id(),
query_ctx.remote_query_id_value().unwrap()
);
assert_eq!(state.dyn_filter_registry_manager().registry_count(), 1);
}
#[test]
fn query_engine_state_separates_registries_for_different_query_contexts() {
let state = new_query_engine_state();
let first_query_ctx = QueryContext::arc();
let second_query_ctx = QueryContext::arc();
let first = state
.get_or_init_remote_dyn_filter_registry(&first_query_ctx)
.unwrap();
let second = state
.get_or_init_remote_dyn_filter_registry(&second_query_ctx)
.unwrap();
assert!(!Arc::ptr_eq(&first, &second));
assert_eq!(state.dyn_filter_registry_manager().registry_count(), 2);
assert_eq!(
first.query_id(),
first_query_ctx.remote_query_id_value().unwrap()
);
assert_eq!(
second.query_id(),
second_query_ctx.remote_query_id_value().unwrap()
);
}
}

View File

@@ -31,9 +31,9 @@ use common_time::timezone::parse_timezone;
use datafusion_common::config::ConfigOptions;
use derive_builder::Builder;
use sql::dialect::{Dialect, GenericDialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect};
use uuid::Uuid;
use crate::protocol_ctx::ProtocolCtx;
use crate::query_id::QueryId;
use crate::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle, PGIntervalStyle};
use crate::{MutableInner, ReadPreference};
@@ -44,7 +44,11 @@ const CURSOR_COUNT_WARNING_LIMIT: usize = 10;
pub const REMOTE_QUERY_ID_EXTENSION_KEY: &str = "remote_query_id";
pub fn generate_remote_query_id() -> String {
Uuid::now_v7().to_string()
generate_remote_query_id_value().to_string()
}
pub fn generate_remote_query_id_value() -> QueryId {
QueryId::new()
}
#[derive(Debug, Builder, Clone)]
@@ -354,6 +358,11 @@ impl QueryContext {
self.extension(REMOTE_QUERY_ID_EXTENSION_KEY)
}
pub fn remote_query_id_value(&self) -> Option<QueryId> {
self.remote_query_id()
.and_then(|query_id| query_id.parse().ok())
}
pub fn extensions(&self) -> HashMap<String, String> {
self.extensions.clone()
}
@@ -798,9 +807,14 @@ mod test {
.build();
assert_eq!(ctx.remote_query_id(), Some(query_id));
assert_eq!(ctx.remote_query_id_value().unwrap().to_string(), query_id);
let proto: api::v1::QueryContext = (&ctx).into();
let restored = QueryContext::from(proto);
assert_eq!(restored.remote_query_id(), Some(query_id));
assert_eq!(
restored.remote_query_id_value().unwrap().to_string(),
query_id
);
}
}

View File

@@ -15,6 +15,7 @@
pub mod context;
pub mod hints;
pub mod protocol_ctx;
pub mod query_id;
pub mod session_config;
pub mod table_name;

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use uuid::Uuid;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct QueryId(Uuid);
impl QueryId {
pub fn new() -> Self {
Self(Uuid::now_v7())
}
pub fn as_uuid(&self) -> &Uuid {
&self.0
}
}
impl Default for QueryId {
fn default() -> Self {
Self::new()
}
}
impl Display for QueryId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl FromStr for QueryId {
type Err = uuid::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(Uuid::parse_str(s)?))
}
}
impl From<Uuid> for QueryId {
fn from(value: Uuid) -> Self {
Self(value)
}
}
impl From<QueryId> for Uuid {
fn from(value: QueryId) -> Self {
value.0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn query_id_round_trips_through_string() {
let query_id = QueryId::new();
let encoded = query_id.to_string();
assert_eq!(encoded.parse::<QueryId>().unwrap(), query_id);
}
}