diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 3c569cc058..eb7d2dfa6c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -13,7 +13,6 @@ // limitations under the License. mod catalog; -mod registrations; use std::collections::HashMap; use std::fmt::Debug; @@ -99,10 +98,6 @@ use crate::error::{ }; use crate::event_listener::RegionServerEventListenerRef; use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; -use crate::region_server::registrations::{ - RegisteredDynFilter, initial_dyn_filter_regs_from_query_ctx, register_initial_dyn_filter_regs, - remove_initial_dyn_filter_regs_for_region, -}; #[derive(Clone)] pub struct RegionServer { @@ -280,18 +275,6 @@ impl RegionServer { common_telemetry::info!("Handle remote read for region: {}", region_id); } - let initial_dyn_filter_regs = initial_dyn_filter_regs_from_query_ctx(&query_ctx); - if query_ctx.explain_verbose() { - common_telemetry::info!( - "Initial remote dyn filter registrations for region {}: {}", - region_id, - initial_dyn_filter_regs - .as_ref() - .map(|regs| regs.regs.len()) - .unwrap_or(0) - ); - } - let decoder = self .inner .query_engine @@ -304,19 +287,7 @@ impl RegionServer { .await .context(DecodeLogicalPlanSnafu)?; - let query_id = query_ctx.remote_query_id().map(ToOwned::to_owned); - if let (Some(query_id), Some(regs)) = - (query_id.as_deref(), initial_dyn_filter_regs.as_ref()) - { - register_initial_dyn_filter_regs( - &self.inner.initial_remote_dyn_filter_registrations, - query_id, - region_id, - regs, - ); - } - - let result = self + let stream = self .inner .handle_read( QueryRequest { @@ -326,19 +297,7 @@ impl RegionServer { }, query_ctx.clone(), ) - .await; - - if result.is_err() - && let Some(query_id) = query_id.as_deref() - { - remove_initial_dyn_filter_regs_for_region( - &self.inner.initial_remote_dyn_filter_registrations, - query_id, - region_id, - ); - } - - let stream = result?; + .await?; let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx); Ok(maybe_guard_stream(stream, permit)) } @@ -1094,9 +1053,6 @@ struct RegionServerInner { /// server with a concrete engine; acceptable for now to fetch Mito-specific /// info (e.g., list SSTs). Consider a diagnostics trait later. mito_engine: RwLock>, - /// TODO(remote-dyn-filter): Reap this query-scoped placeholder registry on query finish/cancel - /// and later fold it into the real remote dyn filter runtime state lifecycle. - initial_remote_dyn_filter_registrations: DashMap>, } struct RegionServerParallelism { @@ -1213,7 +1169,6 @@ impl RegionServerInner { parallelism, topic_stats_reporter: RwLock::new(None), mito_engine: RwLock::new(None), - initial_remote_dyn_filter_registrations: DashMap::new(), } } @@ -1949,10 +1904,6 @@ mod tests { remote_dyn_filter_request, }; use common_error::ext::ErrorExt; - use common_query::request::{ - INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg, - InitialDynFilterRegs, - }; use common_recordbatch::RecordBatches; use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use datatypes::prelude::{ConcreteDataType, VectorRef}; @@ -2110,176 +2061,6 @@ mod tests { assert!(pinned.as_ref().get_ref().metrics().is_none()); } - #[test] - fn initial_dyn_filter_regs_can_be_read_from_query_context() { - let mut query_ctx = QueryContext::with("greptime", "public"); - query_ctx.set_extension( - INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, - InitialDynFilterRegs::new(vec![InitialDynFilterReg::new( - "filter-1", - vec![vec![1, 2, 3]], - )]) - .to_extension_value() - .unwrap(), - ); - - let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx)).unwrap(); - - assert_eq!(regs.regs.len(), 1); - assert_eq!(regs.regs[0].filter_id, "filter-1"); - } - - #[test] - fn initial_dyn_filter_regs_from_query_context_rejects_duplicate_filter_ids() { - let mut query_ctx = QueryContext::with("greptime", "public"); - query_ctx.set_extension( - INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, - InitialDynFilterRegs::new(vec![ - InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]), - InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]), - ]) - .to_extension_value() - .unwrap(), - ); - - let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx)); - - assert!(regs.is_none()); - } - - #[test] - fn register_initial_dyn_filter_regs_creates_query_scoped_entries() { - let regs_by_query = DashMap::>::new(); - let regs = InitialDynFilterRegs::new(vec![ - InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]), - InitialDynFilterReg::new("filter-2", vec![vec![4, 5, 6]]), - ]); - let query_id = "query-1"; - let region_id = RegionId::new(1024, 7); - - register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s); - - let query_regs = regs_by_query.get(query_id).unwrap(); - assert_eq!(query_regs.len(), 2); - let registered = query_regs.get("filter-1").unwrap(); - assert_eq!(registered.filter_id, "filter-1"); - assert_eq!(registered.child_exprs_datafusion_proto, vec![vec![1, 2, 3]]); - assert_eq!(registered.subscriber_regions, vec![region_id]); - } - - #[test] - fn register_initial_dyn_filter_regs_ignores_duplicate_region_entry() { - let regs_by_query = DashMap::>::new(); - let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new( - "filter-1", - vec![vec![1, 2, 3]], - )]); - let query_id = "query-1"; - let region_id = RegionId::new(1024, 7); - - register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s); - register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s); - - let query_regs = regs_by_query.get(query_id).unwrap(); - assert_eq!(query_regs.len(), 1); - let registered = query_regs.get("filter-1").unwrap(); - assert_eq!(registered.subscriber_regions, vec![region_id]); - } - - #[test] - fn register_initial_dyn_filter_regs_ignores_invalid_duplicate_payload_set() { - let regs_by_query = DashMap::>::new(); - let regs = InitialDynFilterRegs::new(vec![ - InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]), - InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]), - ]); - let query_id = "query-1"; - let region_id = RegionId::new(1024, 7); - - register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s); - - assert!(regs_by_query.get(query_id).is_none()); - } - - #[test] - fn register_initial_dyn_filter_regs_merges_regions_for_same_filter() { - let regs_by_query = DashMap::>::new(); - let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new( - "filter-1", - vec![vec![1, 2, 3]], - )]); - let query_id = "query-1"; - let first_region_id = RegionId::new(1024, 7); - let second_region_id = RegionId::new(1024, 8); - - register_initial_dyn_filter_regs(®s_by_query, query_id, first_region_id, ®s); - register_initial_dyn_filter_regs(®s_by_query, query_id, second_region_id, ®s); - - let query_regs = regs_by_query.get(query_id).unwrap(); - assert_eq!(query_regs.len(), 1); - let registered = query_regs.get("filter-1").unwrap(); - assert_eq!( - registered.subscriber_regions, - vec![first_region_id, second_region_id] - ); - } - - #[test] - fn remove_initial_dyn_filter_regs_for_region_removes_region_entries() { - let regs_by_query = DashMap::>::new(); - let query_id = "query-1"; - let region_id = RegionId::new(1024, 7); - let other_query_id = "query-2"; - let other_region_id = RegionId::new(1024, 8); - - register_initial_dyn_filter_regs( - ®s_by_query, - query_id, - region_id, - &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new( - "filter-1", - vec![vec![1, 2, 3]], - )]), - ); - register_initial_dyn_filter_regs( - ®s_by_query, - other_query_id, - other_region_id, - &InitialDynFilterRegs::new(vec![InitialDynFilterReg::new( - "filter-2", - vec![vec![4, 5, 6]], - )]), - ); - - remove_initial_dyn_filter_regs_for_region(®s_by_query, query_id, region_id); - - assert!(regs_by_query.get(query_id).is_none()); - let other_query_regs = regs_by_query.get(other_query_id).unwrap(); - assert_eq!(other_query_regs.len(), 1); - } - - #[test] - fn remove_initial_dyn_filter_regs_for_region_keeps_other_subscribers() { - let regs_by_query = DashMap::>::new(); - let query_id = "query-1"; - let first_region_id = RegionId::new(1024, 7); - let second_region_id = RegionId::new(1024, 8); - let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new( - "filter-1", - vec![vec![1, 2, 3]], - )]); - - register_initial_dyn_filter_regs(®s_by_query, query_id, first_region_id, ®s); - register_initial_dyn_filter_regs(®s_by_query, query_id, second_region_id, ®s); - - remove_initial_dyn_filter_regs_for_region(®s_by_query, query_id, first_region_id); - - let query_regs = regs_by_query.get(query_id).unwrap(); - assert_eq!(query_regs.len(), 1); - let registered = query_regs.get("filter-1").unwrap(); - assert_eq!(registered.subscriber_regions, vec![second_region_id]); - } - #[tokio::test] async fn test_region_registering() { common_telemetry::init_default_ut_logging(); diff --git a/src/datanode/src/region_server/registrations.rs b/src/datanode/src/region_server/registrations.rs deleted file mode 100644 index 7b9523616a..0000000000 --- a/src/datanode/src/region_server/registrations.rs +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_query::request::{ - INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterRegs, -}; -use common_telemetry::warn; -use dashmap::DashMap; -use session::context::QueryContextRef; -use store_api::storage::RegionId; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub(super) struct RegisteredDynFilter { - pub(super) filter_id: String, - pub(super) child_exprs_datafusion_proto: Vec>, - pub(super) subscriber_regions: Vec, -} - -impl RegisteredDynFilter { - fn new( - filter_id: String, - child_exprs_datafusion_proto: Vec>, - region_id: RegionId, - ) -> Self { - Self { - filter_id, - child_exprs_datafusion_proto, - subscriber_regions: vec![region_id], - } - } - - fn register_region(&mut self, region_id: RegionId) -> bool { - if self.subscriber_regions.contains(®ion_id) { - return false; - } - - self.subscriber_regions.push(region_id); - true - } - - fn remove_region(&mut self, region_id: RegionId) -> bool { - let original_len = self.subscriber_regions.len(); - self.subscriber_regions - .retain(|region| *region != region_id); - original_len != self.subscriber_regions.len() - } - - fn has_subscribers(&self) -> bool { - !self.subscriber_regions.is_empty() - } - - fn should_drop_after_remove(&mut self, region_id: RegionId) -> bool { - self.remove_region(region_id) && !self.has_subscribers() - } -} - -pub(super) fn initial_dyn_filter_regs_from_query_ctx( - query_ctx: &QueryContextRef, -) -> Option { - let registrations = - query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?; - match InitialDynFilterRegs::from_extension_value(registrations) { - Ok(registrations) => match registrations.validate_default_bounds() { - Ok(()) => Some(registrations), - Err(error) => { - warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds"); - None - } - }, - Err(error) => { - warn!(error; "Failed to decode initial remote dyn filter registrations from query context"); - None - } - } -} - -pub(super) fn register_initial_dyn_filter_regs( - regs_by_query: &DashMap>, - query_id: &str, - region_id: RegionId, - regs: &InitialDynFilterRegs, -) { - if regs.is_empty() { - return; - } - - if let Err(error) = regs.validate_default_bounds() { - warn!(error; "Ignored invalid initial dyn filter registrations for query_id {} region_id {}", query_id, region_id); - return; - } - - let query_regs = regs_by_query.entry(query_id.to_string()).or_default(); - - for reg in ®s.regs { - if let Some(mut registered) = query_regs.get_mut(®.filter_id) { - if registered.register_region(region_id) { - continue; - } - - warn!( - query_id, - filter_id = reg.filter_id, - region_id = %region_id, - "Duplicate initial dyn filter reg ignored" - ); - continue; - } - - query_regs.insert( - reg.filter_id.clone(), - RegisteredDynFilter::new( - reg.filter_id.clone(), - reg.child_exprs_datafusion_proto.clone(), - region_id, - ), - ); - } -} - -pub(super) fn remove_initial_dyn_filter_regs_for_region( - regs_by_query: &DashMap>, - query_id: &str, - region_id: RegionId, -) { - let should_remove_query = { - let Some(query_regs) = regs_by_query.get(query_id) else { - return; - }; - - let filter_ids_to_remove = query_regs - .iter_mut() - .filter_map(|mut registered| { - registered - .should_drop_after_remove(region_id) - .then(|| registered.filter_id.clone()) - }) - .collect::>(); - - for filter_id in filter_ids_to_remove { - query_regs.remove(&filter_id); - } - - query_regs.is_empty() - }; - - if should_remove_query { - regs_by_query.remove(query_id); - } -}