wip: remote dyn filter task 03

This commit is contained in:
discord9
2026-05-18 16:30:02 +08:00
parent 3a0f37c06b
commit da8854c809
33 changed files with 3161 additions and 416 deletions

View File

@@ -293,11 +293,8 @@ impl RegionRequester {
query_id: impl Into<String>,
update: RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Update(update),
))
.await
self.handle_inner(build_remote_dyn_filter_update_request(query_id, update))
.await
}
pub async fn handle_remote_dyn_filter_unregister(
@@ -305,14 +302,33 @@ impl RegionRequester {
query_id: impl Into<String>,
unregister: RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Unregister(unregister),
self.handle_inner(build_remote_dyn_filter_unregister_request(
query_id, unregister,
))
.await
}
}
pub fn build_remote_dyn_filter_update_request(
query_id: impl Into<String>,
update: RemoteDynFilterUpdate,
) -> RegionRequest {
build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Update(update),
)
}
pub fn build_remote_dyn_filter_unregister_request(
query_id: impl Into<String>,
unregister: RemoteDynFilterUnregister,
) -> RegionRequest {
build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Unregister(unregister),
)
}
fn build_remote_dyn_filter_request(
query_id: String,
action: remote_dyn_filter_request::Action,
@@ -357,7 +373,9 @@ pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
#[cfg(test)]
mod test {
use api::v1::Status as PbStatus;
use api::v1::region::{RemoteDynFilterUpdate, region_request, remote_dyn_filter_request};
use api::v1::region::{
RemoteDynFilterUnregister, RemoteDynFilterUpdate, region_request, remote_dyn_filter_request,
};
use super::*;
use crate::Error::{IllegalDatabaseResponse, Server};
@@ -410,14 +428,14 @@ mod test {
#[test]
fn test_build_remote_dyn_filter_request_sets_header_and_body() {
let request = build_remote_dyn_filter_request(
"query-1".to_string(),
remote_dyn_filter_request::Action::Update(RemoteDynFilterUpdate {
let request = build_remote_dyn_filter_update_request(
"query-1",
RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1, 2, 3],
generation: 7,
is_complete: false,
}),
},
);
request.header.expect("remote dyn filter header must exist");
@@ -433,4 +451,27 @@ mod test {
Some(remote_dyn_filter_request::Action::Update(_))
));
}
#[test]
fn test_build_remote_dyn_filter_unregister_request_sets_header_and_body() {
let request = build_remote_dyn_filter_unregister_request(
"query-1",
RemoteDynFilterUnregister {
filter_id: "filter-9".to_string(),
},
);
request.header.expect("remote dyn filter header must exist");
let body = request.body.expect("remote dyn filter body must exist");
let region_request::Body::RemoteDynFilter(remote_request) = body else {
panic!("expected remote dyn filter request body");
};
assert_eq!(remote_request.query_id, "query-1");
assert!(matches!(
remote_request.action,
Some(remote_dyn_filter_request::Action::Unregister(_))
));
}
}

View File

@@ -19,14 +19,14 @@ use std::sync::Arc;
use api::v1::region::RegionRequestHeader;
use datafusion::execution::TaskContext;
use datafusion::physical_expr::expressions::Column;
use datafusion::physical_plan::joins::HashTableLookupExpr;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::joins::HashTableLookupExpr;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_expr::LogicalPlan;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::physical_plan::from_proto::parse_physical_expr;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::protobuf::PhysicalExprNode;
use prost::Message;
use serde::{Deserialize, Serialize};
@@ -34,8 +34,8 @@ use store_api::storage::RegionId;
/// Current wire-format version for remote dynamic filter payload updates.
pub use self::initial_remote_dyn_filter_reg::{
InitialDynFilterReg, InitialDynFilterRegs,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
InitialDynFilterRegs,
};
pub const DYN_FILTER_PROTOCOL_VERSION: u32 = 1;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use datafusion::arrow::datatypes::Schema;
@@ -24,6 +25,8 @@ use crate::request::{decode_physical_expr_from_bytes, encode_physical_expr_to_by
pub const INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY: &str =
"initial_remote_dyn_filter_registrations";
pub const INITIAL_REMOTE_DYN_FILTER_REGS_MAX_COUNT: usize = 64;
pub const INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES: usize = 64 * 1024;
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct InitialDynFilterRegs {
@@ -40,6 +43,54 @@ impl InitialDynFilterRegs {
self.regs.is_empty()
}
pub fn total_encoded_child_expr_bytes(&self) -> usize {
self.regs
.iter()
.map(InitialDynFilterReg::encoded_child_expr_bytes)
.sum()
}
pub fn validate_default_bounds(&self) -> Result<(), String> {
self.validate_bounds(
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_COUNT,
INITIAL_REMOTE_DYN_FILTER_REGS_MAX_TOTAL_PROTO_BYTES,
)
}
pub fn validate_bounds(
&self,
max_count: usize,
max_total_proto_bytes: usize,
) -> Result<(), String> {
if self.regs.len() > max_count {
return Err(format!(
"InitialDynFilterRegs contains {} registrations, which exceeds the configured limit of {}",
self.regs.len(),
max_count
));
}
let total_proto_bytes = self.total_encoded_child_expr_bytes();
if total_proto_bytes > max_total_proto_bytes {
return Err(format!(
"InitialDynFilterRegs contains {} total child expr proto bytes, which exceeds the configured limit of {}",
total_proto_bytes, max_total_proto_bytes
));
}
let mut seen_filter_ids = HashSet::with_capacity(self.regs.len());
for reg in &self.regs {
if !seen_filter_ids.insert(reg.filter_id.as_str()) {
return Err(format!(
"InitialDynFilterRegs contains duplicate filter_id '{}'",
reg.filter_id
));
}
}
Ok(())
}
pub fn to_extension_value(&self) -> serde_json::Result<String> {
serde_json::to_string(self)
}
@@ -75,6 +126,10 @@ impl InitialDynFilterReg {
Ok(Self::new(filter_id, child_exprs_datafusion_proto))
}
pub fn encoded_child_expr_bytes(&self) -> usize {
self.child_exprs_datafusion_proto.iter().map(Vec::len).sum()
}
pub fn decode_children(
&self,
task_ctx: &TaskContext,
@@ -119,6 +174,42 @@ mod tests {
assert_eq!(decoded, regs);
}
#[test]
fn initial_dyn_filter_regs_validate_bounds_rejects_duplicate_filter_ids() {
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-a", vec![vec![1]]),
InitialDynFilterReg::new("filter-a", vec![vec![2]]),
]);
let err = regs.validate_bounds(8, 1024).unwrap_err();
assert!(err.contains("duplicate filter_id 'filter-a'"));
}
#[test]
fn initial_dyn_filter_regs_validate_bounds_rejects_too_many_regs() {
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-a", vec![vec![1]]),
InitialDynFilterReg::new("filter-b", vec![vec![2]]),
]);
let err = regs.validate_bounds(1, 1024).unwrap_err();
assert!(err.contains("exceeds the configured limit of 1"));
}
#[test]
fn initial_dyn_filter_regs_validate_bounds_rejects_total_proto_bytes_over_limit() {
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-a", vec![vec![1, 2, 3]]),
InitialDynFilterReg::new("filter-b", vec![vec![4, 5, 6]]),
]);
let err = regs.validate_bounds(8, 5).unwrap_err();
assert!(err.contains("6 total child expr proto bytes"));
}
#[test]
fn initial_dyn_filter_reg_round_trips_child_exprs() {
let schema = Schema::new(vec![Field::new("host", DataType::Utf8, false)]);

View File

@@ -99,8 +99,8 @@ use crate::error::{
use crate::event_listener::RegionServerEventListenerRef;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
use crate::region_server::registrations::{
RegisteredDynFilter, initial_dyn_filter_regs_from_query_ctx,
register_initial_dyn_filter_regs, remove_initial_dyn_filter_regs_for_region,
RegisteredDynFilter, initial_dyn_filter_regs_from_query_ctx, register_initial_dyn_filter_regs,
remove_initial_dyn_filter_regs_for_region,
};
#[derive(Clone)]
@@ -304,10 +304,9 @@ impl RegionServer {
.context(DecodeLogicalPlanSnafu)?;
let query_id = query_ctx.remote_query_id().map(ToOwned::to_owned);
if let (Some(query_id), Some(regs)) = (
query_id.as_deref(),
initial_dyn_filter_regs.as_ref(),
) {
if let (Some(query_id), Some(regs)) =
(query_id.as_deref(), initial_dyn_filter_regs.as_ref())
{
register_initial_dyn_filter_regs(
&self.inner.initial_remote_dyn_filter_registrations,
query_id,
@@ -328,7 +327,9 @@ impl RegionServer {
)
.await;
if result.is_err() && let Some(query_id) = query_id.as_deref() {
if result.is_err()
&& let Some(query_id) = query_id.as_deref()
{
remove_initial_dyn_filter_regs_for_region(
&self.inner.initial_remote_dyn_filter_registrations,
query_id,
@@ -1097,8 +1098,7 @@ struct RegionServerInner {
mito_engine: RwLock<Option<MitoEngine>>,
/// TODO(remote-dyn-filter): Reap this query-scoped placeholder registry on query finish/cancel
/// and later fold it into the real remote dyn filter runtime state lifecycle.
initial_remote_dyn_filter_registrations:
DashMap<String, DashMap<String, RegisteredDynFilter>>,
initial_remote_dyn_filter_registrations: DashMap<String, DashMap<String, RegisteredDynFilter>>,
}
struct RegionServerParallelism {
@@ -1901,11 +1901,11 @@ mod tests {
RemoteDynFilterRequest, RemoteDynFilterUnregister, RemoteDynFilterUpdate,
remote_dyn_filter_request,
};
use common_error::ext::ErrorExt;
use common_query::request::{
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
InitialDynFilterRegs,
};
use common_error::ext::ErrorExt;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use datatypes::prelude::{ConcreteDataType, VectorRef};
@@ -2082,6 +2082,24 @@ mod tests {
assert_eq!(regs.regs[0].filter_id, "filter-1");
}
#[test]
fn initial_dyn_filter_regs_from_query_context_rejects_duplicate_filter_ids() {
let mut query_ctx = QueryContext::with("greptime", "public");
query_ctx.set_extension(
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
])
.to_extension_value()
.unwrap(),
);
let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx));
assert!(regs.is_none());
}
#[test]
fn register_initial_dyn_filter_regs_creates_query_scoped_entries() {
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
@@ -2092,12 +2110,7 @@ mod tests {
let query_id = "query-1";
let region_id = RegionId::new(1024, 7);
register_initial_dyn_filter_regs(
&regs_by_query,
query_id,
region_id,
&regs,
);
register_initial_dyn_filter_regs(&regs_by_query, query_id, region_id, &regs);
let query_regs = regs_by_query.get(query_id).unwrap();
assert_eq!(query_regs.len(), 2);
@@ -2108,26 +2121,17 @@ mod tests {
}
#[test]
fn register_initial_dyn_filter_regs_ignores_duplicate_filter_entry() {
fn register_initial_dyn_filter_regs_ignores_duplicate_region_entry() {
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
]);
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
"filter-1",
vec![vec![1, 2, 3]],
)]);
let query_id = "query-1";
let region_id = RegionId::new(1024, 7);
register_initial_dyn_filter_regs(
&regs_by_query,
query_id,
region_id,
&regs,
);
register_initial_dyn_filter_regs(
&regs_by_query,
query_id,
region_id,
&regs,
);
register_initial_dyn_filter_regs(&regs_by_query, query_id, region_id, &regs);
register_initial_dyn_filter_regs(&regs_by_query, query_id, region_id, &regs);
let query_regs = regs_by_query.get(query_id).unwrap();
assert_eq!(query_regs.len(), 1);
@@ -2135,6 +2139,44 @@ mod tests {
assert_eq!(registered.subscriber_regions, vec![region_id]);
}
#[test]
fn register_initial_dyn_filter_regs_ignores_invalid_duplicate_payload_set() {
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
let regs = InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
]);
let query_id = "query-1";
let region_id = RegionId::new(1024, 7);
register_initial_dyn_filter_regs(&regs_by_query, query_id, region_id, &regs);
assert!(regs_by_query.get(query_id).is_none());
}
#[test]
fn register_initial_dyn_filter_regs_merges_regions_for_same_filter() {
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
"filter-1",
vec![vec![1, 2, 3]],
)]);
let query_id = "query-1";
let first_region_id = RegionId::new(1024, 7);
let second_region_id = RegionId::new(1024, 8);
register_initial_dyn_filter_regs(&regs_by_query, query_id, first_region_id, &regs);
register_initial_dyn_filter_regs(&regs_by_query, query_id, second_region_id, &regs);
let query_regs = regs_by_query.get(query_id).unwrap();
assert_eq!(query_regs.len(), 1);
let registered = query_regs.get("filter-1").unwrap();
assert_eq!(
registered.subscriber_regions,
vec![first_region_id, second_region_id]
);
}
#[test]
fn remove_initial_dyn_filter_regs_for_region_removes_region_entries() {
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
@@ -2147,30 +2189,50 @@ mod tests {
&regs_by_query,
query_id,
region_id,
&InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
]),
&InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
"filter-1",
vec![vec![1, 2, 3]],
)]),
);
register_initial_dyn_filter_regs(
&regs_by_query,
other_query_id,
other_region_id,
&InitialDynFilterRegs::new(vec![
InitialDynFilterReg::new("filter-2", vec![vec![4, 5, 6]]),
]),
&InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
"filter-2",
vec![vec![4, 5, 6]],
)]),
);
remove_initial_dyn_filter_regs_for_region(
&regs_by_query,
query_id,
region_id,
);
remove_initial_dyn_filter_regs_for_region(&regs_by_query, query_id, region_id);
assert!(regs_by_query.get(query_id).is_none());
let other_query_regs = regs_by_query.get(other_query_id).unwrap();
assert_eq!(other_query_regs.len(), 1);
}
#[test]
fn remove_initial_dyn_filter_regs_for_region_keeps_other_subscribers() {
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
let query_id = "query-1";
let first_region_id = RegionId::new(1024, 7);
let second_region_id = RegionId::new(1024, 8);
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
"filter-1",
vec![vec![1, 2, 3]],
)]);
register_initial_dyn_filter_regs(&regs_by_query, query_id, first_region_id, &regs);
register_initial_dyn_filter_regs(&regs_by_query, query_id, second_region_id, &regs);
remove_initial_dyn_filter_regs_for_region(&regs_by_query, query_id, first_region_id);
let query_regs = regs_by_query.get(query_id).unwrap();
assert_eq!(query_regs.len(), 1);
let registered = query_regs.get("filter-1").unwrap();
assert_eq!(registered.subscriber_regions, vec![second_region_id]);
}
#[tokio::test]
async fn test_region_registering() {
common_telemetry::init_default_ut_logging();

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_query::request::{
InitialDynFilterRegs, INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterRegs,
};
use common_telemetry::warn;
use dashmap::DashMap;
@@ -39,6 +39,30 @@ impl RegisteredDynFilter {
subscriber_regions: vec![region_id],
}
}
fn register_region(&mut self, region_id: RegionId) -> bool {
if self.subscriber_regions.contains(&region_id) {
return false;
}
self.subscriber_regions.push(region_id);
true
}
fn remove_region(&mut self, region_id: RegionId) -> bool {
let original_len = self.subscriber_regions.len();
self.subscriber_regions
.retain(|region| *region != region_id);
original_len != self.subscriber_regions.len()
}
fn has_subscribers(&self) -> bool {
!self.subscriber_regions.is_empty()
}
fn should_drop_after_remove(&mut self, region_id: RegionId) -> bool {
self.remove_region(region_id) && !self.has_subscribers()
}
}
pub(super) fn initial_dyn_filter_regs_from_query_ctx(
@@ -47,7 +71,13 @@ pub(super) fn initial_dyn_filter_regs_from_query_ctx(
let registrations =
query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?;
match InitialDynFilterRegs::from_extension_value(registrations) {
Ok(registrations) => Some(registrations),
Ok(registrations) => match registrations.validate_default_bounds() {
Ok(()) => Some(registrations),
Err(error) => {
warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds");
None
}
},
Err(error) => {
warn!(error; "Failed to decode initial remote dyn filter registrations from query context");
None
@@ -65,17 +95,26 @@ pub(super) fn register_initial_dyn_filter_regs(
return;
}
if let Err(error) = regs.validate_default_bounds() {
warn!(error; "Ignored invalid initial dyn filter registrations for query_id {} region_id {}", query_id, region_id);
return;
}
let query_regs = regs_by_query
.entry(query_id.to_string())
.or_insert_with(DashMap::new);
for reg in &regs.regs {
if query_regs.contains_key(&reg.filter_id) {
if let Some(mut registered) = query_regs.get_mut(&reg.filter_id) {
if registered.register_region(region_id) {
continue;
}
warn!(
query_id,
filter_id = reg.filter_id,
region_id = %region_id,
"Duplicate initial remote dyn filter registration ignored"
"Duplicate initial dyn filter reg ignored"
);
continue;
}
@@ -102,11 +141,10 @@ pub(super) fn remove_initial_dyn_filter_regs_for_region(
};
let filter_ids_to_remove = query_regs
.iter()
.filter_map(|registered| {
.iter_mut()
.filter_map(|mut registered| {
registered
.subscriber_regions
.contains(&region_id)
.should_drop_after_remove(region_id)
.then(|| registered.filter_id.clone())
})
.collect::<Vec<_>>();

View File

@@ -14,7 +14,11 @@
use std::sync::Arc;
use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
use async_trait::async_trait;
use client::region::{
build_remote_dyn_filter_unregister_request, build_remote_dyn_filter_update_request,
};
use common_error::ext::BoxedError;
use common_meta::node_manager::NodeManagerRef;
use common_query::request::QueryRequest;
@@ -24,6 +28,7 @@ use query::error::{RegionQuerySnafu, Result as QueryResult};
use query::region_query::RegionQueryHandler;
use session::ReadPreference;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::{FindRegionPeerSnafu, RequestQuerySnafu, Result};
@@ -56,6 +61,30 @@ impl RegionQueryHandler for FrontendRegionQueryHandler {
.map_err(BoxedError::new)
.context(RegionQuerySnafu)
}
async fn handle_remote_dyn_filter_update(
&self,
region_id: RegionId,
query_id: String,
update: RemoteDynFilterUpdate,
) -> QueryResult<()> {
self.handle_remote_dyn_filter_update_inner(region_id, query_id, update)
.await
.map_err(BoxedError::new)
.context(RegionQuerySnafu)
}
async fn handle_remote_dyn_filter_unregister(
&self,
region_id: RegionId,
query_id: String,
unregister: RemoteDynFilterUnregister,
) -> QueryResult<()> {
self.handle_remote_dyn_filter_unregister_inner(region_id, query_id, unregister)
.await
.map_err(BoxedError::new)
.context(RegionQuerySnafu)
}
}
impl FrontendRegionQueryHandler {
@@ -82,4 +111,50 @@ impl FrontendRegionQueryHandler {
.await
.context(RequestQuerySnafu)
}
async fn handle_remote_dyn_filter_update_inner(
&self,
region_id: RegionId,
query_id: String,
update: RemoteDynFilterUpdate,
) -> Result<()> {
let peer = &self
.partition_manager
.find_region_leader(region_id)
.await
.context(FindRegionPeerSnafu {
region_id,
read_preference: ReadPreference::Leader,
})?;
let client = self.node_manager.datanode(peer).await;
client
.handle(build_remote_dyn_filter_update_request(query_id, update))
.await
.context(RequestQuerySnafu)?;
Ok(())
}
async fn handle_remote_dyn_filter_unregister_inner(
&self,
region_id: RegionId,
query_id: String,
unregister: RemoteDynFilterUnregister,
) -> Result<()> {
let peer = &self
.partition_manager
.find_region_leader(region_id)
.await
.context(FindRegionPeerSnafu {
region_id,
read_preference: ReadPreference::Leader,
})?;
let client = self.node_manager.datanode(peer).await;
client
.handle(build_remote_dyn_filter_unregister_request(
query_id, unregister,
))
.await
.context(RequestQuerySnafu)?;
Ok(())
}
}

View File

@@ -14,22 +14,22 @@
mod analyzer;
mod commutativity;
mod dyn_filter_bridge;
mod filter_id;
mod merge_scan;
mod merge_sort;
mod planner;
mod predicate_extractor;
mod region_pruner;
mod dyn_filter_bridge;
mod remote_dyn_filter_registry;
pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions};
pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError};
pub use filter_id::{FilterFingerprint, FilterId, ParseFilterIdError, ProducerScopeId};
pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan};
pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner};
pub use predicate_extractor::PredicateExtractor;
pub use region_pruner::ConstraintPruner;
pub use remote_dyn_filter_registry::{
DynFilterEntry, DynFilterRegistryManager, EntryRegistration, QueryDynFilterRegistry,
RegistryState, Subscriber, SubscriberRegistration,
RemoteDynFilterRegistryLease, Subscriber, SubscriberRegistration,
};

View File

@@ -14,6 +14,7 @@
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use common_telemetry::debug;
use datafusion::config::{ConfigExtension, ExtensionOptions};
@@ -31,6 +32,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::ProducerScopeId;
use crate::dist_plan::analyzer::utils::{
PatchOptimizerContext, PlanTreeExpressionSimplifier, aliased_columns_for,
rewrite_merge_sort_exprs,
@@ -122,14 +124,15 @@ impl AnalyzerRule for DistPlannerAnalyzer {
let opt = config.extensions.get::<DistPlannerOptions>();
let allow_fallback = opt.map(|o| o.allow_query_fallback).unwrap_or(false);
let result = match self.try_push_down(plan.clone()) {
let producer_scope_allocator = ProducerScopeAllocator::default();
let result = match self.try_push_down(plan.clone(), producer_scope_allocator.clone()) {
Ok(plan) => plan,
Err(err) => {
if allow_fallback {
common_telemetry::warn!(err; "Failed to push down plan, using fallback plan rewriter for plan: {plan}");
// if push down failed, use fallback plan rewriter
PUSH_DOWN_FALLBACK_ERRORS_TOTAL.inc();
self.use_fallback(plan)?
self.use_fallback(plan, producer_scope_allocator)?
} else {
return Err(err);
}
@@ -142,21 +145,37 @@ impl AnalyzerRule for DistPlannerAnalyzer {
impl DistPlannerAnalyzer {
/// Try push down as many nodes as possible
fn try_push_down(&self, plan: LogicalPlan) -> DfResult<LogicalPlan> {
let plan = plan.transform(&Self::inspect_plan_with_subquery)?;
let mut rewriter = PlanRewriter::default();
fn try_push_down(
&self,
plan: LogicalPlan,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<LogicalPlan> {
let plan = plan.transform(|plan| {
Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone())
})?;
let mut rewriter = PlanRewriter::new(producer_scope_allocator);
let result = plan.data.rewrite(&mut rewriter)?.data;
Ok(result)
}
/// Use fallback plan rewriter to rewrite the plan and only push down table scan nodes
fn use_fallback(&self, plan: LogicalPlan) -> DfResult<LogicalPlan> {
let mut rewriter = fallback::FallbackPlanRewriter;
let result = plan.rewrite(&mut rewriter)?.data;
fn use_fallback(
&self,
plan: LogicalPlan,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<LogicalPlan> {
let plan = plan.transform(|plan| {
Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone())
})?;
let mut rewriter = fallback::FallbackPlanRewriter::new(producer_scope_allocator);
let result = plan.data.rewrite(&mut rewriter)?.data;
Ok(result)
}
fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult<Transformed<LogicalPlan>> {
fn inspect_plan_with_subquery(
plan: LogicalPlan,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<Transformed<LogicalPlan>> {
// Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469 and https://github.com/GreptimeTeam/greptimedb/issues/5799
// FIXME(yingwen): Remove the `Limit` plan once we update DataFusion.
if let LogicalPlan::Limit(_) | LogicalPlan::Distinct(_) = &plan {
@@ -166,7 +185,10 @@ impl DistPlannerAnalyzer {
let exprs = plan
.expressions_consider_join()
.into_iter()
.map(|e| e.transform(&Self::transform_subquery).map(|x| x.data))
.map(|e| {
e.transform(|expr| Self::transform_subquery(expr, producer_scope_allocator.clone()))
.map(|x| x.data)
})
.collect::<DfResult<Vec<_>>>()?;
// Some plans that are special treated (should not call `with_new_exprs` on them)
@@ -178,33 +200,37 @@ impl DistPlannerAnalyzer {
}
}
fn transform_subquery(expr: Expr) -> DfResult<Transformed<Expr>> {
fn transform_subquery(
expr: Expr,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<Transformed<Expr>> {
match expr {
Expr::Exists(exists) => Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Self::handle_subquery(exists.subquery)?,
subquery: Self::handle_subquery(exists.subquery, producer_scope_allocator)?,
negated: exists.negated,
}))),
Expr::InSubquery(in_subquery) => Ok(Transformed::yes(Expr::InSubquery(InSubquery {
expr: in_subquery.expr,
subquery: Self::handle_subquery(in_subquery.subquery)?,
subquery: Self::handle_subquery(in_subquery.subquery, producer_scope_allocator)?,
negated: in_subquery.negated,
}))),
Expr::ScalarSubquery(scalar_subquery) => Ok(Transformed::yes(Expr::ScalarSubquery(
Self::handle_subquery(scalar_subquery)?,
Self::handle_subquery(scalar_subquery, producer_scope_allocator)?,
))),
_ => Ok(Transformed::no(expr)),
}
}
fn handle_subquery(subquery: Subquery) -> DfResult<Subquery> {
let mut rewriter = PlanRewriter::default();
let mut rewrote_subquery = subquery
.subquery
.as_ref()
.clone()
.rewrite(&mut rewriter)?
.data;
fn handle_subquery(
subquery: Subquery,
producer_scope_allocator: ProducerScopeAllocator,
) -> DfResult<Subquery> {
let subquery_plan = subquery.subquery.as_ref().clone().transform(|plan| {
Self::inspect_plan_with_subquery(plan, producer_scope_allocator.clone())
})?;
let mut rewriter = PlanRewriter::new(producer_scope_allocator);
let mut rewrote_subquery = subquery_plan.data.rewrite(&mut rewriter)?.data;
// Workaround. DF doesn't support the first plan in subquery to be an Extension
if matches!(rewrote_subquery, LogicalPlan::Extension(_)) {
let output_schema = rewrote_subquery.schema().clone();
@@ -226,6 +252,21 @@ impl DistPlannerAnalyzer {
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct ProducerScopeAllocator {
next_remote_dyn_filter_producer_scope_id: Arc<AtomicU64>,
}
impl ProducerScopeAllocator {
fn allocate(&self) -> ProducerScopeId {
ProducerScopeId::new(
self.next_remote_dyn_filter_producer_scope_id
.fetch_add(1, Ordering::SeqCst)
+ 1,
)
}
}
/// Status of the rewriter to mark if the current pass is expanded
#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
enum RewriterStatus {
@@ -289,9 +330,21 @@ struct PlanRewriter {
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
producer_scope_allocator: ProducerScopeAllocator,
}
impl PlanRewriter {
fn new(producer_scope_allocator: ProducerScopeAllocator) -> Self {
Self {
producer_scope_allocator,
..Default::default()
}
}
fn allocate_remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_allocator.allocate()
}
fn get_parent(&self) -> Option<&LogicalPlan> {
// level starts from 1, it's safe to minus by 1
self.stack
@@ -586,12 +639,14 @@ impl PlanRewriter {
);
// add merge scan as the new root
let producer_scope_id = self.allocate_remote_dyn_filter_producer_scope_id();
let mut node = MergeScanLogicalPlan::new(
on_node.clone(),
false,
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
self.partition_cols.clone().unwrap_or_default(),
producer_scope_id,
)
.into_logical_plan();

View File

@@ -27,15 +27,31 @@ use datafusion_expr::LogicalPlan;
use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::MergeScanLogicalPlan;
use crate::dist_plan::analyzer::{AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER};
use crate::dist_plan::analyzer::{
AliasMapping, OTHER_PHY_PART_COL_PLACEHOLDER, ProducerScopeAllocator,
};
use crate::dist_plan::{MergeScanLogicalPlan, ProducerScopeId};
/// FallbackPlanRewriter is a plan rewriter that will only push down table scan node
/// This is used when `PlanRewriter` produce errors when trying to rewrite the plan
/// This is a temporary solution, and will be removed once we have a more robust plan rewriter
/// It will traverse the logical plan and rewrite table scan node to merge scan node
#[derive(Debug, Clone, Default)]
pub struct FallbackPlanRewriter;
pub struct FallbackPlanRewriter {
producer_scope_allocator: ProducerScopeAllocator,
}
impl FallbackPlanRewriter {
pub fn new(producer_scope_allocator: ProducerScopeAllocator) -> Self {
Self {
producer_scope_allocator,
}
}
fn allocate_remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_allocator.allocate()
}
}
impl TreeNodeRewriter for FallbackPlanRewriter {
type Node = LogicalPlan;
@@ -105,6 +121,7 @@ impl TreeNodeRewriter for FallbackPlanRewriter {
// at this stage, the partition cols should be set
// treat it as non-partitioned if None
partition_cols.clone().unwrap_or_default(),
self.allocate_remote_dyn_filter_producer_scope_id(),
)
.into_logical_plan();
Ok(Transformed::yes(node))

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::pin::Pin;
use std::sync::Arc;
@@ -31,7 +32,8 @@ use datafusion::prelude::SessionContext;
use datafusion_common::{JoinType, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
AggregateUDF, Expr, ExprSchemable as _, LogicalPlanBuilder, Operator, binary_expr, col, lit,
AggregateUDF, Expr, ExprSchemable as _, LogicalPlanBuilder, Operator, Subquery, binary_expr,
col, lit,
};
use datafusion_functions::datetime::date_bin;
use datafusion_functions::datetime::expr_fn::now;
@@ -53,6 +55,42 @@ use table::{Table, TableRef};
use super::*;
fn collect_merge_scan_producer_scope_ids(
plan: &LogicalPlan,
scopes: &mut BTreeSet<ProducerScopeId>,
) {
if let LogicalPlan::Extension(extension) = plan
&& let Some(merge_scan) = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
{
scopes.insert(merge_scan.producer_scope_id());
}
for input in plan.inputs() {
collect_merge_scan_producer_scope_ids(input, scopes);
}
}
fn collect_merge_scan_producer_scope_id_list(
plan: &LogicalPlan,
scopes: &mut Vec<ProducerScopeId>,
) {
if let LogicalPlan::Extension(extension) = plan
&& let Some(merge_scan) = extension
.node
.as_any()
.downcast_ref::<MergeScanLogicalPlan>()
{
scopes.push(merge_scan.producer_scope_id());
}
for input in plan.inputs() {
collect_merge_scan_producer_scope_id_list(input, scopes);
}
}
pub(crate) struct TestTable;
impl TestTable {
@@ -1360,6 +1398,60 @@ fn test_simplify_select_now_expression() {
assert_eq!(expected, normalized);
}
#[test]
fn sibling_merge_scan_producers_have_unique_scope_ids() {
init_default_ut_logging();
let left_table = TestTable::table_with_name(0, "left_table".to_string());
let right_table = TestTable::table_with_name(1, "right_table".to_string());
let left_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(left_table),
)));
let right_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(right_table),
)));
let left_sorted =
LogicalPlanBuilder::scan_with_filters("left_table", left_source, None, vec![])
.unwrap()
.sort(vec![col("pk1").sort(true, false)])
.unwrap()
.build()
.unwrap();
let right_sorted =
LogicalPlanBuilder::scan_with_filters("right_table", right_source, None, vec![])
.unwrap()
.sort(vec![col("pk1").sort(true, false)])
.unwrap()
.build()
.unwrap();
let plan = LogicalPlanBuilder::from(left_sorted)
.cross_join(right_sorted)
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let mut scopes = Vec::new();
collect_merge_scan_producer_scope_id_list(&result, &mut scopes);
let unique_scopes = scopes.iter().copied().collect::<BTreeSet<_>>();
assert!(
scopes.len() >= 2,
"Expected at least 2 ProducerScopeIds, got {}: {scopes:?}",
scopes.len()
);
assert_eq!(
scopes.len(),
unique_scopes.len(),
"Expected all sibling ProducerScopeIds to be unique, got scopes: {scopes:?}"
);
}
#[test]
fn test_simplify_now_expression() {
init_default_ut_logging();
@@ -1823,6 +1915,46 @@ fn transform_sort_subquery_alias() {
assert_eq!(expected, result.to_string());
}
#[test]
fn producer_scope_ids_do_not_collide_between_subquery_and_outer_plan() {
let test_table = TestTable::table_with_name(0, "numbers".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
let subquery_plan =
LogicalPlanBuilder::scan_with_filters("inner", table_source.clone(), None, vec![])
.unwrap()
.build()
.unwrap();
let subquery = Subquery {
subquery: Arc::new(subquery_plan),
outer_ref_columns: Default::default(),
spans: Default::default(),
};
let allocator = ProducerScopeAllocator::default();
let rewritten_subquery =
DistPlannerAnalyzer::handle_subquery(subquery, allocator.clone()).unwrap();
let outer_plan = LogicalPlanBuilder::scan_with_filters("outer", table_source, None, vec![])
.unwrap()
.build()
.unwrap();
let rewritten_outer = DistPlannerAnalyzer {}
.try_push_down(outer_plan, allocator)
.unwrap();
let mut subquery_scopes = BTreeSet::new();
collect_merge_scan_producer_scope_ids(
rewritten_subquery.subquery.as_ref(),
&mut subquery_scopes,
);
let mut outer_scopes = BTreeSet::new();
collect_merge_scan_producer_scope_ids(&rewritten_outer, &mut outer_scopes);
assert_eq!(subquery_scopes.len(), 1);
assert_eq!(outer_scopes.len(), 1);
assert_ne!(subquery_scopes, outer_scopes);
}
#[test]
fn date_bin_ts_group_by() {
init_default_ut_logging();

View File

@@ -16,29 +16,42 @@ use std::any::Any;
use std::sync::Arc;
use common_query::request::{
InitialDynFilterReg, InitialDynFilterRegs,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
InitialDynFilterRegs,
};
use datafusion::execution::TaskContext;
use datafusion_common::Result;
use datafusion_physical_expr::expressions::{lit, Column, DynamicFilterPhysicalExpr};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
#[cfg(test)]
use datafusion_physical_expr::expressions::{Column, lit};
use session::context::{QueryContext, QueryContextRef};
#[cfg(test)]
use session::query_id::QueryId;
use store_api::storage::RegionId;
#[cfg(test)]
use uuid::Uuid;
use super::filter_id::build_remote_dyn_filter_id;
use crate::dist_plan::{FilterId, QueryDynFilterRegistry, Subscriber};
use crate::dist_plan::{FilterId, ProducerScopeId, QueryDynFilterRegistry, Subscriber};
use crate::query_engine::QueryEngineState;
#[derive(Debug, Clone)]
pub(crate) struct CapturedDynFilter {
pub(crate) producer_scope_id: ProducerScopeId,
pub(crate) producer_local_ordinal: usize,
pub(crate) alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
}
#[derive(Debug, Clone)]
struct ResolvedDynFilter {
filter_id: FilterId,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
children: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
}
pub(crate) fn capture_remote_dyn_filters(
producer_scope_id: ProducerScopeId,
parent_filters: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
) -> Vec<CapturedDynFilter> {
parent_filters
@@ -46,6 +59,7 @@ pub(crate) fn capture_remote_dyn_filters(
.enumerate()
.filter_map(|(producer_local_ordinal, filter)| {
downcast_dynamic_filter(filter).map(|alive_dyn_filter| CapturedDynFilter {
producer_scope_id,
producer_local_ordinal,
alive_dyn_filter,
})
@@ -62,9 +76,7 @@ fn downcast_dynamic_filter(
}
fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<QueryEngineState>> {
let query_engine_state: Option<Arc<QueryEngineState>> =
context.session_config().get_extension();
query_engine_state
context.session_config().get_extension()
}
pub(crate) fn register_dyn_filters_for_region(
@@ -72,18 +84,13 @@ pub(crate) fn register_dyn_filters_for_region(
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) {
for captured_dyn_filter in captured_dyn_filters {
let Ok((filter_id, _children)) =
filter_id_and_children_for_filter(region_id, captured_dyn_filter)
else {
continue;
};
for resolved_filter in resolved_dyn_filters(region_id, captured_dyn_filters) {
let _ = registry.register_remote_dyn_filter(
filter_id.clone(),
captured_dyn_filter.alive_dyn_filter.clone(),
resolved_filter.filter_id.clone(),
resolved_filter.alive_dyn_filter,
);
let _ = registry.register_subscriber(&filter_id, Subscriber::new(region_id));
let _ =
registry.register_subscriber(&resolved_filter.filter_id, Subscriber::new(region_id));
}
}
@@ -108,13 +115,10 @@ pub(crate) fn bridge_dyn_filters_for_region(
register_dyn_filters_for_region(&registry, region_id, captured_dyn_filters);
}
fn filter_id_and_children_for_filter(
fn resolve_dyn_filter(
region_id: RegionId,
captured_dyn_filter: &CapturedDynFilter,
) -> Result<(
FilterId,
Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
)> {
) -> Result<ResolvedDynFilter> {
let children = captured_dyn_filter
.alive_dyn_filter
.children()
@@ -123,11 +127,26 @@ fn filter_id_and_children_for_filter(
.collect::<Vec<_>>();
let filter_id = build_remote_dyn_filter_id(
region_id,
captured_dyn_filter.producer_scope_id,
captured_dyn_filter.producer_local_ordinal,
&children,
)?;
Ok((filter_id, children))
Ok(ResolvedDynFilter {
filter_id,
alive_dyn_filter: captured_dyn_filter.alive_dyn_filter.clone(),
children,
})
}
fn resolved_dyn_filters(
region_id: RegionId,
captured_dyn_filters: &[CapturedDynFilter],
) -> Vec<ResolvedDynFilter> {
captured_dyn_filters
.iter()
.filter_map(|captured_dyn_filter| resolve_dyn_filter(region_id, captured_dyn_filter).ok())
.collect()
}
fn build_initial_dyn_filter_regs_for_region(
@@ -135,20 +154,14 @@ fn build_initial_dyn_filter_regs_for_region(
captured_dyn_filters: &[CapturedDynFilter],
) -> InitialDynFilterRegs {
InitialDynFilterRegs::new(
captured_dyn_filters
.iter()
.filter_map(|captured_dyn_filter| {
let Ok((filter_id, children)) =
filter_id_and_children_for_filter(region_id, captured_dyn_filter)
else {
return None;
};
resolved_dyn_filters(region_id, captured_dyn_filters)
.into_iter()
.filter_map(|resolved_filter| {
match InitialDynFilterReg::from_filter_id_and_children(
filter_id.to_string(),
&children,
resolved_filter.filter_id.to_string(),
&resolved_filter.children,
) {
Ok(registration) => Some(registration),
Ok(reg) => Some(reg),
Err(error) => {
common_telemetry::warn!(error; "Failed to encode initial remote dyn filter registration");
None
@@ -170,6 +183,11 @@ pub(crate) fn query_context_with_initial_dyn_filter_regs(
return region_query_ctx;
}
if let Err(error) = regs.validate_default_bounds() {
common_telemetry::warn!(error; "Dropping initial remote dyn filter registrations for region {} that exceed Task 03 bounds", region_id);
return region_query_ctx;
}
match regs.to_extension_value() {
Ok(serialized) => region_query_ctx.set_extension(
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
@@ -191,6 +209,10 @@ mod tests {
QueryId::from(Uuid::from_u128(value))
}
fn test_producer_scope(value: u64) -> ProducerScopeId {
ProducerScopeId::new(value)
}
#[test]
fn capture_remote_dyn_filters_preserves_parent_filter_ordinals() {
let parent_filters = vec![
@@ -206,9 +228,12 @@ mod tests {
)) as Arc<dyn datafusion::physical_plan::PhysicalExpr>,
];
let captured = capture_remote_dyn_filters(parent_filters);
let producer_scope_id = test_producer_scope(42);
let captured = capture_remote_dyn_filters(producer_scope_id, parent_filters);
assert_eq!(captured.len(), 2);
assert_eq!(captured[0].producer_scope_id, producer_scope_id);
assert_eq!(captured[1].producer_scope_id, producer_scope_id);
assert_eq!(captured[0].producer_local_ordinal, 1);
assert_eq!(captured[1].producer_local_ordinal, 3);
}
@@ -217,6 +242,7 @@ mod tests {
fn register_dyn_filters_for_region_reuses_existing_entry() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let captured_dyn_filters = vec![CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
@@ -230,14 +256,46 @@ mod tests {
assert_eq!(registry.entry_count(), 1);
let entry = registry.entries().pop().unwrap();
assert_eq!(
entry.filter_id().producer_scope_id(),
test_producer_scope(42)
);
assert_eq!(entry.filter_id().producer_ordinal(), 2);
assert_eq!(entry.subscribers().len(), 1);
assert_eq!(entry.subscribers()[0].region_id(), region_id);
}
#[test]
fn register_dyn_filters_for_region_keeps_independent_producer_scopes_distinct() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
let region_id = RegionId::new(1024, 7);
let make_filter = |producer_scope_id| CapturedDynFilter {
producer_scope_id,
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
};
register_dyn_filters_for_region(
&registry,
region_id,
&[make_filter(test_producer_scope(42))],
);
register_dyn_filters_for_region(
&registry,
region_id,
&[make_filter(test_producer_scope(43))],
);
assert_eq!(registry.entry_count(), 2);
}
#[test]
fn query_context_includes_region_initial_dyn_filter_regs() {
let captured_dyn_filters = vec![CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
@@ -269,6 +327,7 @@ mod tests {
.unwrap();
let expected_filter_id = build_remote_dyn_filter_id(
region_id,
captured_dyn_filters[0].producer_scope_id,
captured_dyn_filters[0].producer_local_ordinal,
&[Arc::new(Column::new("host", 0)) as Arc<_>],
)
@@ -279,4 +338,40 @@ mod tests {
assert_eq!(decoded_children.len(), 1);
assert!(decoded_children[0].as_any().is::<Column>());
}
#[test]
fn query_context_drops_initial_regs_when_duplicate_filter_ids_exceed_bounds() {
let captured_dyn_filters = vec![
CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
},
CapturedDynFilter {
producer_scope_id: test_producer_scope(42),
producer_local_ordinal: 2,
alive_dyn_filter: Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::new(Column::new("host", 0)) as Arc<_>],
lit(true) as _,
)),
},
];
let region_id = RegionId::new(1024, 7);
let query_ctx = QueryContext::arc();
let region_query_ctx = query_context_with_initial_dyn_filter_regs(
&query_ctx,
region_id,
&captured_dyn_filters,
);
assert!(
region_query_ctx
.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
.is_none()
);
}
}

View File

@@ -52,9 +52,37 @@ impl FromStr for FilterFingerprint {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProducerScopeId(u64);
impl ProducerScopeId {
pub fn new(value: u64) -> Self {
Self(value)
}
pub fn get(self) -> u64 {
self.0
}
}
impl Display for ProducerScopeId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:016x}", self.0)
}
}
impl FromStr for ProducerScopeId {
type Err = ParseIntError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(u64::from_str_radix(s, 16)?))
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct FilterId {
region_id: RegionId,
producer_scope_id: ProducerScopeId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
}
@@ -66,11 +94,13 @@ pub struct FilterId {
impl FilterId {
pub fn new(
region_id: RegionId,
producer_scope_id: ProducerScopeId,
producer_ordinal: u32,
children_fingerprint: FilterFingerprint,
) -> Self {
Self {
region_id,
producer_scope_id,
producer_ordinal,
children_fingerprint,
}
@@ -84,6 +114,10 @@ impl FilterId {
self.producer_ordinal
}
pub fn producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_id
}
pub fn children_fingerprint(&self) -> FilterFingerprint {
self.children_fingerprint
}
@@ -93,8 +127,9 @@ impl Display for FilterId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}:{}",
"{}:{}:{}:{}",
self.region_id.as_u64(),
self.producer_scope_id,
self.producer_ordinal,
self.children_fingerprint
)
@@ -112,7 +147,12 @@ impl FromStr for FilterId {
.parse::<u64>()
.map(RegionId::from_u64)
.map_err(|_| ParseFilterIdError)?;
let producer_ordinal = parts
let producer_scope_id = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<ProducerScopeId>()
.map_err(|_| ParseFilterIdError)?;
let producer_local_ordinal = parts
.next()
.ok_or(ParseFilterIdError)?
.parse::<u32>()
@@ -126,7 +166,12 @@ impl FromStr for FilterId {
return Err(ParseFilterIdError);
}
Ok(Self::new(region_id, producer_ordinal, children_fingerprint))
Ok(Self::new(
region_id,
producer_scope_id,
producer_local_ordinal,
children_fingerprint,
))
}
}
@@ -141,7 +186,7 @@ impl Display for ParseFilterIdError {
/// Builds the query-local remote dynamic filter identity.
///
/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`.
/// The identity is `region_id + producer scope + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `partition` stay outside this key so they can remain in
/// the later fanout/subscriber map instead of splitting one shared remote filter state.
///
@@ -151,6 +196,7 @@ impl Display for ParseFilterIdError {
#[allow(unused)]
pub(crate) fn build_remote_dyn_filter_id(
region_id: RegionId,
producer_scope_id: ProducerScopeId,
producer_local_ordinal: usize,
children: &[Arc<dyn PhysicalExpr>],
) -> Result<FilterId> {
@@ -161,6 +207,7 @@ pub(crate) fn build_remote_dyn_filter_id(
})?;
Ok(FilterId::new(
region_id,
producer_scope_id,
producer_local_ordinal,
children_fingerprint,
))
@@ -202,7 +249,12 @@ mod tests {
#[test]
fn filter_id_round_trips_through_string() {
let filter_id = FilterId::new(RegionId::new(1024, 7), 3, FilterFingerprint::new(0xabc));
let filter_id = FilterId::new(
RegionId::new(1024, 7),
ProducerScopeId::new(42),
3,
FilterFingerprint::new(0xabc),
);
let encoded = filter_id.to_string();
assert_eq!(encoded.parse::<FilterId>().unwrap(), filter_id);
@@ -212,32 +264,77 @@ mod tests {
fn filter_id_rejects_malformed_strings() {
assert!("".parse::<FilterId>().is_err());
assert!("1024:3".parse::<FilterId>().is_err());
assert!("1024:3:zzzz".parse::<FilterId>().is_err());
assert!("1024:3:0000000000000abc:extra".parse::<FilterId>().is_err());
assert!("1024:0000000000000001:3:zzzz".parse::<FilterId>().is_err());
assert!(
"1024:0000000000000001:3:0000000000000abc:extra"
.parse::<FilterId>()
.is_err()
);
}
#[test]
fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
let region_id = RegionId::new(1024, 7);
let first =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
let second =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
let producer_scope_id = ProducerScopeId::new(42);
let first = build_remote_dyn_filter_id(
region_id,
producer_scope_id,
3,
&test_children(&["host", "pod"]),
)
.unwrap();
let second = build_remote_dyn_filter_id(
region_id,
producer_scope_id,
3,
&test_children(&["host", "pod"]),
)
.unwrap();
assert_eq!(first, second);
}
#[test]
fn remote_dyn_filter_id_changes_when_producer_scope_changes() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
ProducerScopeId::new(42),
3,
&children,
)
.unwrap();
let different_scope = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
ProducerScopeId::new(43),
3,
&children,
)
.unwrap();
assert_ne!(baseline, different_scope);
}
#[test]
fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap();
let different_region =
build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap();
let different_ordinal =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap();
let different_children =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"]))
let producer_scope_id = ProducerScopeId::new(42);
let baseline =
build_remote_dyn_filter_id(RegionId::new(1024, 7), producer_scope_id, 3, &children)
.unwrap();
let different_region =
build_remote_dyn_filter_id(RegionId::new(1024, 8), producer_scope_id, 3, &children)
.unwrap();
let different_ordinal =
build_remote_dyn_filter_id(RegionId::new(1024, 7), producer_scope_id, 4, &children)
.unwrap();
let different_children = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
producer_scope_id,
3,
&test_children(&["pod", "host"]),
)
.unwrap();
assert_ne!(baseline, different_region);
assert_ne!(baseline, different_ordinal);
@@ -247,20 +344,27 @@ mod tests {
#[test]
fn remote_dyn_filter_id_supports_empty_children() {
let region_id = RegionId::new(4096, 2);
let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
let producer_scope_id = ProducerScopeId::new(42);
let first = build_remote_dyn_filter_id(region_id, producer_scope_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(region_id, producer_scope_id, 1, &[]).unwrap();
assert_eq!(first, second);
assert_eq!(first.region_id(), region_id);
assert_eq!(first.producer_scope_id(), producer_scope_id);
assert_eq!(first.producer_ordinal(), 1);
assert_eq!(first.children_fingerprint(), second.children_fingerprint());
}
#[test]
fn remote_dyn_filter_id_rejects_out_of_range_producer_ordinal() {
let error = build_remote_dyn_filter_id(RegionId::new(1024, 7), usize::MAX, &[])
.unwrap_err()
.to_string();
let error = build_remote_dyn_filter_id(
RegionId::new(1024, 7),
ProducerScopeId::new(42),
usize::MAX,
&[],
)
.unwrap_err()
.to_string();
assert!(error.contains("producer ordinal out of range for filter id"));
}

View File

@@ -40,7 +40,7 @@ use datafusion::physical_plan::{
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr};
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
@@ -57,9 +57,33 @@ use crate::dist_plan::dyn_filter_bridge::{
CapturedDynFilter, bridge_dyn_filters_for_region, capture_remote_dyn_filters,
query_context_with_initial_dyn_filter_regs,
};
use crate::dist_plan::{ProducerScopeId, RemoteDynFilterRegistryLease};
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::query_engine::QueryEngineState;
use crate::region_query::RegionQueryHandlerRef;
fn query_engine_state_from_task_context(context: &TaskContext) -> Option<Arc<QueryEngineState>> {
context.session_config().get_extension()
}
fn acquire_remote_dyn_filter_registry_cleanup(
context: &TaskContext,
query_ctx: &QueryContextRef,
captured_dyn_filters: &[CapturedDynFilter],
) -> Option<RemoteDynFilterRegistryLease> {
if captured_dyn_filters.is_empty() {
return None;
}
let query_id = query_ctx.remote_query_id_value()?;
let query_engine_state = query_engine_state_from_task_context(context)?;
Some(
query_engine_state
.dyn_filter_registry_manager()
.acquire_lease(query_id),
)
}
#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
pub struct MergeScanLogicalPlan {
/// In logical plan phase it only contains one input
@@ -67,6 +91,7 @@ pub struct MergeScanLogicalPlan {
/// If this plan is a placeholder
is_placeholder: bool,
partition_cols: AliasMapping,
producer_scope_id: ProducerScopeId,
}
impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
@@ -107,11 +132,17 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
}
impl MergeScanLogicalPlan {
pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
pub fn new(
input: LogicalPlan,
is_placeholder: bool,
partition_cols: AliasMapping,
producer_scope_id: ProducerScopeId,
) -> Self {
Self {
input,
is_placeholder,
partition_cols,
producer_scope_id,
}
}
@@ -137,6 +168,10 @@ impl MergeScanLogicalPlan {
pub fn partition_cols(&self) -> &AliasMapping {
&self.partition_cols
}
pub fn producer_scope_id(&self) -> ProducerScopeId {
self.producer_scope_id
}
}
#[derive(Clone)]
@@ -153,6 +188,7 @@ pub struct MergeScanExec {
/// Metrics for each partition
partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
query_ctx: QueryContextRef,
remote_dyn_filter_producer_scope_id: ProducerScopeId,
captured_remote_dyn_filters: Arc<Mutex<Vec<CapturedDynFilter>>>,
target_partition: usize,
partition_cols: AliasMapping,
@@ -180,6 +216,7 @@ impl MergeScanExec {
query_ctx: QueryContextRef,
target_partition: usize,
partition_cols: AliasMapping,
producer_scope_id: ProducerScopeId,
) -> Result<Self> {
// TODO(CookiePieWw): Initially we removed the metadata from the schema in #2000, but we have to
// keep it for #4619 to identify json type in src/datatypes/src/schema/column_schema.rs.
@@ -252,6 +289,7 @@ impl MergeScanExec {
partition_metrics: Arc::default(),
properties,
query_ctx,
remote_dyn_filter_producer_scope_id: producer_scope_id,
captured_remote_dyn_filters: Arc::default(),
target_partition,
partition_cols,
@@ -279,8 +317,14 @@ impl MergeScanExec {
let current_channel = self.query_ctx.channel();
let read_preference = self.query_ctx.read_preference();
let explain_verbose = self.query_ctx.explain_verbose();
let remote_dyn_filter_registry_cleanup = acquire_remote_dyn_filter_registry_cleanup(
context.as_ref(),
&query_ctx,
&captured_remote_dyn_filters,
);
let stream = Box::pin(stream!({
let _remote_dyn_filter_registry_cleanup = remote_dyn_filter_registry_cleanup;
// only report metrics once for each MergeScan
if partition == 0 {
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
@@ -486,6 +530,7 @@ impl MergeScanExec {
sub_stage_metrics: self.sub_stage_metrics.clone(),
partition_metrics: self.partition_metrics.clone(),
query_ctx: self.query_ctx.clone(),
remote_dyn_filter_producer_scope_id: self.remote_dyn_filter_producer_scope_id,
captured_remote_dyn_filters: self.captured_remote_dyn_filters.clone(),
target_partition: self.target_partition,
partition_cols: self.partition_cols.clone(),
@@ -527,6 +572,13 @@ impl MergeScanExec {
}
}
#[cfg(test)]
impl MergeScanExec {
fn remote_dyn_filter_producer_scope_id(&self) -> ProducerScopeId {
self.remote_dyn_filter_producer_scope_id
}
}
/// Metrics for a region of a partition.
#[derive(Debug, Clone)]
struct RegionMetrics {
@@ -650,7 +702,7 @@ impl ExecutionPlan for MergeScanExec {
.map(|filter| filter.as_any().is::<DynamicFilterPhysicalExpr>())
.collect::<Vec<_>>();
*self.captured_remote_dyn_filters.lock().unwrap() =
capture_remote_dyn_filters(parent_filters);
capture_remote_dyn_filters(self.remote_dyn_filter_producer_scope_id, parent_filters);
let new_self = Arc::new(self.clone());
Ok(FilterPushdownPropagation {
@@ -775,3 +827,156 @@ impl MergeScanMetric {
self.greptime_exec_cost.add(metrics);
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use async_trait::async_trait;
use datafusion::execution::SessionStateBuilder;
use datafusion_common::TableReference;
use datafusion_expr::{LogicalPlanBuilder, lit};
use datafusion_physical_expr::Distribution;
use datafusion_physical_expr::expressions::Column;
use session::ReadPreference;
use session::context::QueryContext;
use session::query_id::QueryId;
use table::table_name::TableName;
use uuid::Uuid;
use super::*;
use crate::dist_plan::DynFilterRegistryManager;
use crate::region_query::RegionQueryHandler;
fn test_query_id(value: u128) -> QueryId {
QueryId::from(Uuid::from_u128(value))
}
#[test]
fn remote_dyn_filter_registry_cleanup_waits_for_last_query_scoped_stream_drop() {
let registry_manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
registry_manager.get_or_init(query_id);
let first = registry_manager.acquire_lease(query_id);
let second = registry_manager.acquire_lease(query_id);
drop(first);
assert_eq!(registry_manager.registry_count(), 1);
drop(second);
assert_eq!(registry_manager.registry_count(), 0);
}
#[test]
fn remote_dyn_filter_registry_cleanup_shares_query_scope_across_independent_leases() {
let registry_manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let first_exec_like_lease = registry_manager.acquire_lease(query_id);
let second_exec_like_lease = registry_manager.acquire_lease(query_id);
drop(first_exec_like_lease);
assert_eq!(registry_manager.registry_count(), 1);
drop(second_exec_like_lease);
assert_eq!(registry_manager.registry_count(), 0);
}
struct TestRegionQueryHandler;
#[async_trait]
impl RegionQueryHandler for TestRegionQueryHandler {
async fn do_get(
&self,
_read_preference: ReadPreference,
_request: common_query::request::QueryRequest,
) -> crate::error::Result<common_recordbatch::SendableRecordBatchStream> {
unimplemented!("test only")
}
async fn handle_remote_dyn_filter_update(
&self,
_region_id: RegionId,
_query_id: String,
_update: api::v1::region::RemoteDynFilterUpdate,
) -> crate::error::Result<()> {
unimplemented!("test only")
}
async fn handle_remote_dyn_filter_unregister(
&self,
_region_id: RegionId,
_query_id: String,
_unregister: api::v1::region::RemoteDynFilterUnregister,
) -> crate::error::Result<()> {
unimplemented!("test only")
}
}
#[test]
fn try_with_new_distribution_preserves_producer_scope_id() {
let producer_scope_id = ProducerScopeId::new(42);
// Build a plan whose schema contains "col1"
let plan = LogicalPlanBuilder::empty(true)
.project(vec![lit(1i32).alias("col1")])
.unwrap()
.build()
.unwrap();
let schema = plan.schema().as_arrow().clone();
let table = TableName::new("catalog", "schema", "table");
let regions = vec![RegionId::new(1024, 1)];
let query_ctx = QueryContext::arc();
// Non-empty partition_cols so try_with_new_distribution can detect an overlap
let mut partition_cols = AliasMapping::new();
partition_cols.insert(
"col1".to_string(),
BTreeSet::from([ColumnExpr::new(Some(TableReference::bare("table")), "col1")]),
);
let session_state = SessionStateBuilder::new().build();
let handler = Arc::new(TestRegionQueryHandler);
let target_partition = 2;
let exec = MergeScanExec::new(
&session_state,
table,
regions,
plan,
&schema,
handler,
query_ctx,
target_partition,
partition_cols,
producer_scope_id,
)
.unwrap();
assert_eq!(
exec.remote_dyn_filter_producer_scope_id(),
producer_scope_id
);
// A distribution that differs from the current partitioning but shares a
// column name present in partition_cols, so try_with_new_distribution
// produces a clone instead of returning None.
let new_dist = Distribution::HashPartitioned(vec![
Arc::new(Column::new("col1", 0)),
Arc::new(Column::new("col2", 1)),
]);
let cloned = exec
.try_with_new_distribution(new_dist)
.expect("expected a cloned exec with overlapping partition col");
assert_eq!(
cloned.remote_dyn_filter_producer_scope_id(),
producer_scope_id,
"try_with_new_distribution must preserve producer scope id"
);
}
}

View File

@@ -178,6 +178,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
query_ctx,
session_state.config().target_partitions(),
merge_scan.partition_cols().clone(),
merge_scan.producer_scope_id(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}

View File

@@ -13,26 +13,15 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use session::query_id::QueryId;
use store_api::storage::RegionId;
use crate::dist_plan::FilterId;
/// Lifecycle state for a query-scoped remote dynamic filter registry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegistryState {
Active,
// TODO(remote-dyn-filter): Subtask 04+ should wire query finish/cancel hooks to move a
// registry into Closing, then drive the cleanup tail (final unregister/complete RPCs,
// watcher shutdown, and any in-flight control-path draining) before mark_closed().
Closing,
Closed,
}
use crate::dist_plan::{FilterId, ProducerScopeId};
/// Routing metadata for a remote dynamic filter subscriber.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -55,7 +44,6 @@ impl Subscriber {
pub enum EntryRegistration {
Inserted(Arc<DynFilterEntry>),
Existing(Arc<DynFilterEntry>),
RejectedByState(RegistryState),
}
/// Result of registering a subscriber under an existing filter entry.
@@ -64,47 +52,29 @@ pub enum SubscriberRegistration {
Added,
Duplicate,
MissingFilter,
RejectedByState(RegistryState),
}
/// A registered query-local remote dynamic filter entry.
///
/// This stores the alive DataFusion filter handle together with the subscriber fanout metadata
/// and the registry-owned watcher bookkeeping that later subtasks will drive.
// TODO(remote-dyn-filter): Revisit whether this filter-level entry should stay this rich once
// the real watcher/fanout loop lands. Some fields may move to query-level shared runtime state.
/// This stores the alive DataFusion filter handle together with minimal subscriber state.
#[derive(Debug)]
pub struct DynFilterEntry {
filter_id: FilterId,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
last_epoch: AtomicU64,
last_observed_generation: AtomicU64,
alive_dyn_filter: Weak<DynamicFilterPhysicalExpr>,
subscribers: RwLock<Vec<Subscriber>>,
// TODO(remote-dyn-filter): This watcher bookkeeping is only a subtask-03 skeleton
// placeholder for the later wait_update/fanout wiring. Revisit whether filter-level
// watcher state is still the right shape once the real async cleanup/update loop lands.
watcher_started: AtomicBool,
}
#[derive(Debug)]
struct QueryDynFilterRegistryInner {
state: RegistryState,
entries: HashMap<FilterId, Arc<DynFilterEntry>>,
}
impl DynFilterEntry {
pub fn new(filter_id: FilterId, alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>) -> Self {
// TODO(remote-dyn-filter): When real watcher/update scheduling lands, confirm that seeding
// the observed generation here is still the right initialization point.
let last_observed_generation = alive_dyn_filter.snapshot_generation();
Self {
filter_id,
alive_dyn_filter,
last_epoch: AtomicU64::new(0),
last_observed_generation: AtomicU64::new(last_observed_generation),
alive_dyn_filter: Arc::downgrade(&alive_dyn_filter),
subscribers: RwLock::new(Vec::new()),
watcher_started: AtomicBool::new(false),
}
}
@@ -112,29 +82,8 @@ impl DynFilterEntry {
&self.filter_id
}
pub fn alive_dyn_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
self.alive_dyn_filter.clone()
}
pub fn last_epoch(&self) -> u64 {
self.last_epoch.load(Ordering::SeqCst)
}
pub fn set_last_epoch(&self, epoch: u64) {
// TODO(remote-dyn-filter): Later subtasks should centralize epoch advancement with the
// actual unary update dispatch path so this does not drift from sent update ordering.
self.last_epoch.store(epoch, Ordering::SeqCst);
}
pub fn last_observed_generation(&self) -> u64 {
self.last_observed_generation.load(Ordering::SeqCst)
}
pub fn set_last_observed_generation(&self, generation: u64) {
// TODO(remote-dyn-filter): Later subtasks should update this only after the watcher has
// consumed a new alive filter snapshot and decided whether to emit a remote update.
self.last_observed_generation
.store(generation, Ordering::SeqCst);
pub fn upgrade_alive_dyn_filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
self.alive_dyn_filter.upgrade()
}
pub fn subscribers(&self) -> Vec<Subscriber> {
@@ -150,31 +99,13 @@ impl DynFilterEntry {
subscribers.push(subscriber);
true
}
pub fn start_watcher_if_needed(&self) -> bool {
// TODO(remote-dyn-filter): Replace this placeholder gate with the real async watcher task
// launch point once wait_update/fanout wiring exists. Re-evaluate whether the gate still
// belongs on each filter entry or should move to query-level dispatch state.
self.watcher_started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
}
pub fn watcher_started(&self) -> bool {
self.watcher_started.load(Ordering::SeqCst)
}
pub fn mark_watcher_stopped(&self) {
// TODO(remote-dyn-filter): Hook this into the real watcher shutdown path during the
// Closing cleanup tail so the registry only reaches Closed after watcher teardown.
self.watcher_started.store(false, Ordering::SeqCst);
}
}
/// Query-scoped registry that owns all remote dynamic filters for one query.
#[derive(Debug)]
pub struct QueryDynFilterRegistry {
query_id: QueryId,
active_streams: AtomicUsize,
inner: RwLock<QueryDynFilterRegistryInner>,
}
@@ -182,8 +113,8 @@ impl QueryDynFilterRegistry {
pub fn new(query_id: QueryId) -> Self {
Self {
query_id,
active_streams: AtomicUsize::new(0),
inner: RwLock::new(QueryDynFilterRegistryInner {
state: RegistryState::Active,
entries: HashMap::new(),
}),
}
@@ -193,8 +124,16 @@ impl QueryDynFilterRegistry {
self.query_id
}
pub fn state(&self) -> RegistryState {
self.inner.read().unwrap().state
fn acquire_stream(&self) {
self.active_streams.fetch_add(1, Ordering::SeqCst);
}
fn release_stream(&self) {
self.active_streams.fetch_sub(1, Ordering::SeqCst);
}
fn active_stream_count(&self) -> usize {
self.active_streams.load(Ordering::SeqCst)
}
pub fn entry_count(&self) -> usize {
@@ -220,13 +159,7 @@ impl QueryDynFilterRegistry {
filter_id: FilterId,
alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>,
) -> EntryRegistration {
// TODO(remote-dyn-filter): Subtask 05 should call this from the MergeScan bridge after it
// identifies a remote-propagatable alive dyn filter for the current query.
let mut inner = self.inner.write().unwrap();
if inner.state != RegistryState::Active {
return EntryRegistration::RejectedByState(inner.state);
}
if let Some(existing) = inner.entries.get(&filter_id) {
return EntryRegistration::Existing(existing.clone());
}
@@ -241,32 +174,9 @@ impl QueryDynFilterRegistry {
filter_id: &FilterId,
subscriber: Subscriber,
) -> SubscriberRegistration {
// TODO(remote-dyn-filter): Later subtasks should route remote subscriber metadata into
// this method when MergeScan builds the query_id + filter_id fanout map.
let entry = {
let inner = self.inner.read().unwrap();
if inner.state != RegistryState::Active {
return SubscriberRegistration::RejectedByState(inner.state);
}
let Some(entry) = inner.entries.get(filter_id) else {
return SubscriberRegistration::MissingFilter;
};
entry.clone()
};
let inner = self.inner.read().unwrap();
if inner.state != RegistryState::Active {
return SubscriberRegistration::RejectedByState(inner.state);
}
let Some(current_entry) = inner.entries.get(filter_id) else {
let Some(entry) = self.inner.read().unwrap().entries.get(filter_id).cloned() else {
return SubscriberRegistration::MissingFilter;
};
if !Arc::ptr_eq(current_entry, &entry) {
return SubscriberRegistration::MissingFilter;
}
if entry.register_subscriber(subscriber) {
SubscriberRegistration::Added
@@ -274,26 +184,33 @@ impl QueryDynFilterRegistry {
SubscriberRegistration::Duplicate
}
}
}
pub fn begin_closing(&self) -> RegistryState {
let mut inner = self.inner.write().unwrap();
match inner.state {
RegistryState::Active => {
// TODO(remote-dyn-filter): Closing is where the later cleanup tail starts. After
// this transition, new registrations stay rejected while existing entries remain
// available for final unregister/complete fanout and watcher shutdown.
inner.state = RegistryState::Closing;
RegistryState::Closing
}
RegistryState::Closing | RegistryState::Closed => inner.state,
#[derive(Debug)]
pub struct RemoteDynFilterRegistryLease {
registry_manager: Arc<DynFilterRegistryManager>,
registry: Arc<QueryDynFilterRegistry>,
}
impl RemoteDynFilterRegistryLease {
fn new(
registry_manager: Arc<DynFilterRegistryManager>,
registry: Arc<QueryDynFilterRegistry>,
) -> Self {
registry.acquire_stream();
Self {
registry_manager,
registry,
}
}
}
pub fn mark_closed(&self) {
// TODO(remote-dyn-filter): Call this only after Closing cleanup finishes (final control
// RPCs sent, watchers stopped, and any short tail work drained). The manager removes the
// registry from its query map after this point.
self.inner.write().unwrap().state = RegistryState::Closed;
impl Drop for RemoteDynFilterRegistryLease {
fn drop(&mut self) {
self.registry.release_stream();
let _ = self
.registry_manager
.remove_if_inactive(&self.registry.query_id(), &self.registry);
}
}
@@ -308,9 +225,34 @@ impl DynFilterRegistryManager {
self.registries.read().unwrap().get(query_id).cloned()
}
#[cfg(test)]
fn remove(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
self.registries.write().unwrap().remove(query_id)
}
fn remove_if_inactive(
&self,
query_id: &QueryId,
registry: &Arc<QueryDynFilterRegistry>,
) -> Option<Arc<QueryDynFilterRegistry>> {
let mut registries = self.registries.write().unwrap();
let Some(current) = registries.get(query_id) else {
return None;
};
if Arc::ptr_eq(current, registry) && registry.active_stream_count() == 0 {
registries.remove(query_id)
} else {
None
}
}
pub fn acquire_lease(self: &Arc<Self>, query_id: QueryId) -> RemoteDynFilterRegistryLease {
let registry = self.get_or_init(query_id);
RemoteDynFilterRegistryLease::new(self.clone(), registry)
}
pub fn get_or_init(&self, query_id: QueryId) -> Arc<QueryDynFilterRegistry> {
// TODO(remote-dyn-filter): Subtask 04 should wire query-engine runtime ownership through
// this entry point so query_id-scoped registries live with distributed query execution.
let mut registries = self.registries.write().unwrap();
registries
@@ -319,30 +261,6 @@ impl DynFilterRegistryManager {
.clone()
}
pub fn begin_closing(&self, query_id: &QueryId) -> Option<Arc<QueryDynFilterRegistry>> {
// TODO(remote-dyn-filter): Query finish/cancel hooks should call this to start the cleanup
// tail, not remove the registry immediately.
let registry = self.get(query_id)?;
registry.begin_closing();
Some(registry)
}
pub fn reap_closed(&self, query_id: &QueryId) -> bool {
// TODO(remote-dyn-filter): Cleanup code should call this only after mark_closed(). If a
// later implementation needs a retained closed-tail window, expand here instead of adding
// ad-hoc removal at call sites.
let mut registries = self.registries.write().unwrap();
let Some(registry) = registries.get(query_id) else {
return false;
};
if registry.state() != RegistryState::Closed {
return false;
}
registries.remove(query_id);
true
}
pub fn registry_count(&self) -> usize {
self.registries.read().unwrap().len()
}
@@ -350,6 +268,7 @@ impl DynFilterRegistryManager {
#[cfg(test)]
mod tests {
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::expressions::{Column, lit};
use uuid::Uuid;
@@ -361,7 +280,12 @@ mod tests {
}
fn test_filter_id(region_id: RegionId, producer_ordinal: u32) -> FilterId {
FilterId::new(region_id, producer_ordinal, FilterFingerprint::new(0xabc))
FilterId::new(
region_id,
ProducerScopeId::new(42),
producer_ordinal,
FilterFingerprint::new(0xabc),
)
}
fn test_dyn_filter(names: &[&str]) -> Arc<DynamicFilterPhysicalExpr> {
@@ -385,6 +309,48 @@ mod tests {
assert_eq!(manager.registry_count(), 1);
}
#[test]
fn registry_manager_removes_registry_for_query() {
let manager = DynFilterRegistryManager::default();
let query_id = test_query_id(1);
let registry = manager.get_or_init(query_id);
assert!(Arc::ptr_eq(&manager.remove(&query_id).unwrap(), &registry));
assert!(manager.get(&query_id).is_none());
assert_eq!(manager.registry_count(), 0);
}
#[test]
fn registry_manager_lease_waits_for_last_query_scoped_stream() {
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let first = manager.acquire_lease(query_id);
let second = manager.acquire_lease(query_id);
assert_eq!(manager.registry_count(), 1);
drop(first);
assert_eq!(manager.registry_count(), 1);
drop(second);
assert_eq!(manager.registry_count(), 0);
}
#[test]
fn registry_manager_lease_does_not_remove_reacquired_registry() {
let manager = Arc::new(DynFilterRegistryManager::default());
let query_id = test_query_id(1);
let first = manager.acquire_lease(query_id);
drop(first);
let second = manager.acquire_lease(query_id);
assert_eq!(manager.registry_count(), 1);
drop(second);
assert_eq!(manager.registry_count(), 0);
}
#[test]
fn registry_stores_filter_and_deduplicates_subscribers() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
@@ -396,10 +362,6 @@ mod tests {
};
assert_eq!(entry.filter_id(), &filter_id);
assert_eq!(
entry.last_observed_generation(),
filter.snapshot_generation()
);
assert_eq!(registry.entry_count(), 1);
let subscriber = Subscriber::new(RegionId::new(1024, 1));
@@ -413,55 +375,4 @@ mod tests {
);
assert_eq!(entry.subscribers().len(), 1);
}
#[test]
fn registry_lifecycle_rejects_new_work_after_closing() {
let registry = QueryDynFilterRegistry::new(test_query_id(1));
assert_eq!(registry.state(), RegistryState::Active);
assert_eq!(registry.begin_closing(), RegistryState::Closing);
assert_eq!(registry.state(), RegistryState::Closing);
let filter = test_dyn_filter(&["host"]);
let filter_id = test_filter_id(RegionId::new(1024, 7), 1);
assert!(matches!(
registry.register_remote_dyn_filter(filter_id, filter),
EntryRegistration::RejectedByState(RegistryState::Closing)
));
registry.mark_closed();
assert_eq!(registry.state(), RegistryState::Closed);
}
#[test]
fn registered_filter_starts_watcher_once() {
let entry = DynFilterEntry::new(
test_filter_id(RegionId::new(1024, 7), 1),
test_dyn_filter(&["host"]),
);
assert!(entry.start_watcher_if_needed());
assert!(entry.watcher_started());
assert!(!entry.start_watcher_if_needed());
entry.mark_watcher_stopped();
assert!(!entry.watcher_started());
}
#[test]
fn manager_reaps_closed_registry() {
let manager = DynFilterRegistryManager::default();
let query_id = test_query_id(1);
let registry = manager.get_or_init(query_id);
let _ = registry.register_remote_dyn_filter(
test_filter_id(RegionId::new(1024, 7), 1),
test_dyn_filter(&["host"]),
);
registry.mark_closed();
assert!(manager.reap_closed(&query_id));
assert_eq!(manager.registry_count(), 0);
assert!(manager.get(&query_id).is_none());
}
}

View File

@@ -226,7 +226,7 @@ impl QueryEngineState {
.with_query_planner(Arc::new(DfQueryPlanner::new(
catalog_list.clone(),
partition_rule_manager,
region_query_handler,
region_query_handler.clone(),
)))
.with_optimizer_rules(optimizer.rules)
.with_physical_optimizer_rules(physical_optimizer.rules)
@@ -407,25 +407,6 @@ impl QueryEngineState {
Some(self.dyn_filter_registry_manager.get_or_init(query_id))
}
pub fn begin_closing_remote_dyn_filter_registry(
&self,
query_ctx: &QueryContextRef,
) -> Option<Arc<QueryDynFilterRegistry>> {
// TODO(remote-dyn-filter): Wire query finish/cancel hooks through this helper once the
// distributed query lifecycle exposes a single cleanup callback for the query runtime.
let query_id = query_ctx.remote_query_id_value()?;
self.dyn_filter_registry_manager.begin_closing(&query_id)
}
pub fn reap_closed_remote_dyn_filter_registry(&self, query_ctx: &QueryContextRef) -> bool {
// TODO(remote-dyn-filter): Call this after the Closing cleanup tail marks the registry
// closed. Subtask 04 only exposes the runtime ownership path; later subtasks should decide
// the exact lifecycle hook that invokes this.
query_ctx
.remote_query_id_value()
.is_some_and(|query_id| self.dyn_filter_registry_manager.reap_closed(&query_id))
}
pub fn function_state(&self) -> Arc<FunctionState> {
self.function_state.clone()
}
@@ -647,30 +628,6 @@ mod tests {
assert_eq!(first.query_id(), query_ctx.remote_query_id_value().unwrap());
}
#[test]
fn query_engine_state_exposes_closing_and_reap_helpers() {
let state = new_query_engine_state();
let query_ctx = QueryContext::arc();
let registry = state
.get_or_init_remote_dyn_filter_registry(&query_ctx)
.unwrap();
let closing = state
.begin_closing_remote_dyn_filter_registry(&query_ctx)
.unwrap();
assert!(Arc::ptr_eq(&registry, &closing));
assert_eq!(closing.state(), crate::dist_plan::RegistryState::Closing);
closing.mark_closed();
assert!(state.reap_closed_remote_dyn_filter_registry(&query_ctx));
assert!(
state
.dyn_filter_registry_manager()
.get(&query_ctx.remote_query_id_value().unwrap())
.is_none()
);
}
#[test]
fn query_engine_state_relies_on_query_context_remote_query_id_contract() {
let state = new_query_engine_state();

View File

@@ -14,12 +14,14 @@
use std::sync::Arc;
use api::v1::region::{RemoteDynFilterUnregister, RemoteDynFilterUpdate};
use async_trait::async_trait;
use common_meta::node_manager::NodeManagerRef;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use partition::manager::PartitionRuleManagerRef;
use session::ReadPreference;
use store_api::storage::RegionId;
use crate::error::Result;
@@ -42,6 +44,20 @@ pub trait RegionQueryHandler: Send + Sync {
read_preference: ReadPreference,
request: QueryRequest,
) -> Result<SendableRecordBatchStream>;
async fn handle_remote_dyn_filter_update(
&self,
region_id: RegionId,
query_id: String,
update: RemoteDynFilterUpdate,
) -> Result<()>;
async fn handle_remote_dyn_filter_unregister(
&self,
region_id: RegionId,
query_id: String,
unregister: RemoteDynFilterUnregister,
) -> Result<()>;
}
pub type RegionQueryHandlerRef = Arc<dyn RegionQueryHandler>;

View File

@@ -34,7 +34,9 @@ use common_telemetry::{debug, error, tracing, warn};
use common_time::timezone::parse_timezone;
use futures_util::StreamExt;
use session::context::{Channel, QueryContextBuilder, QueryContextRef};
use session::hints::{READ_PREFERENCE_HINT, is_reserved_extension_key};
use session::hints::{
READ_PREFERENCE_HINT, REMOTE_QUERY_ID_EXTENSION_KEY, is_reserved_extension_key,
};
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
@@ -286,7 +288,7 @@ impl Drop for RequestTimer {
mod tests {
use chrono::FixedOffset;
use common_time::Timezone;
use session::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
use session::hints::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
use super::*;
@@ -303,6 +305,14 @@ mod tests {
vec![
("auto_create_table".to_string(), "true".to_string()),
("read_preference".to_string(), "leader".to_string()),
(
REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
"spoofed".to_string(),
),
(
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(),
"spoofed-regs".to_string(),
),
],
)
.unwrap();
@@ -327,6 +337,12 @@ mod tests {
query_context.remote_query_id(),
Some(extensions[1].1.as_str())
);
assert_ne!(query_context.remote_query_id(), Some("spoofed"));
assert!(
query_context
.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
.is_none()
);
}
#[test]

View File

@@ -45,8 +45,11 @@ fn apply_hints(query_ctx: &mut QueryContext, hints: Vec<(String, String)>) {
#[cfg(test)]
mod tests {
use common_query::request::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY as COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
use session::context::{QueryContextBuilder, generate_remote_query_id};
use session::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
use session::hints::{
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY,
};
use super::apply_hints;
@@ -67,6 +70,10 @@ mod tests {
REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
"spoofed".to_string(),
),
(
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY.to_string(),
"spoofed-regs".to_string(),
),
("ttl".to_string(), "7d".to_string()),
],
);
@@ -75,6 +82,19 @@ mod tests {
query_ctx.remote_query_id(),
Some(original_query_id.as_str())
);
assert!(
query_ctx
.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)
.is_none()
);
assert_eq!(query_ctx.extension("ttl"), Some("7d"));
}
#[test]
fn test_initial_dyn_filter_registration_key_matches_common_query_constant() {
assert_eq!(
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
COMMON_INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY
);
}
}

View File

@@ -17,9 +17,14 @@ pub const HINTS_KEY: &str = "x-greptime-hints";
/// Deprecated, use `HINTS_KEY` instead. Notes if "x-greptime-hints" is set, keys with this prefix will be ignored.
pub const HINTS_KEY_PREFIX: &str = "x-greptime-hint-";
pub const REMOTE_QUERY_ID_EXTENSION_KEY: &str = "remote_query_id";
pub const INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY: &str =
"initial_remote_dyn_filter_registrations";
pub const READ_PREFERENCE_HINT: &str = "read_preference";
pub const RESERVED_EXTENSION_KEYS: [&str; 1] = [REMOTE_QUERY_ID_EXTENSION_KEY];
pub const RESERVED_EXTENSION_KEYS: [&str; 2] = [
REMOTE_QUERY_ID_EXTENSION_KEY,
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
];
/// Deprecated, use `HINTS_KEY` instead.
pub const HINT_KEYS: [&str; 7] = [
@@ -43,6 +48,9 @@ mod tests {
#[test]
fn test_is_reserved_extension_key() {
assert!(is_reserved_extension_key(REMOTE_QUERY_ID_EXTENSION_KEY));
assert!(is_reserved_extension_key(
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY
));
assert!(!is_reserved_extension_key(READ_PREFERENCE_HINT));
}
}

View File

@@ -35,6 +35,7 @@ use context::{ConfigurationVariables, QueryContextBuilder};
use derive_more::Debug;
use crate::context::{Channel, ConnInfo, QueryContextRef};
use crate::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
/// Maximum number of warnings to store per session (similar to MySQL's max_error_count)
const MAX_WARNINGS: usize = 64;