feat: filter id

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-13 14:39:22 +08:00
parent d4cfbd3400
commit 855155563b
3 changed files with 107 additions and 1 deletions

1
Cargo.lock generated
View File

@@ -11144,6 +11144,7 @@ dependencies = [
"datafusion-functions",
"datafusion-optimizer",
"datafusion-physical-expr",
"datafusion-proto",
"datafusion-sql",
"datatypes",
"either",

View File

@@ -44,6 +44,7 @@ datafusion-expr-common.workspace = true
datafusion-functions.workspace = true
datafusion-optimizer.workspace = true
datafusion-physical-expr.workspace = true
datafusion-proto.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true
either.workspace = true

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::hash::{DefaultHasher, Hasher};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -37,11 +38,16 @@ use datafusion::physical_plan::{
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use datafusion_physical_expr::{
Distribution, EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
};
use datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr;
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use prost::Message;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::table_name::TableName;
@@ -499,6 +505,42 @@ impl MergeScanExec {
}
}
/// Builds the Phase 1 query-local remote dynamic filter identity.
///
/// The identity is `region_id + producer-local ordinal + canonicalized child fingerprint`.
/// Subscriber routing details such as `partition` stay outside this key so they can remain in
/// the later fanout/subscriber map instead of splitting one shared remote filter state.
#[allow(unused)]
pub(crate) fn build_remote_dyn_filter_id(
region_id: RegionId,
producer_local_ordinal: usize,
children: &[Arc<dyn PhysicalExpr>],
) -> Result<String> {
let children_fingerprint = canonicalize_dyn_filter_children(children)?;
Ok(format!(
"{region_id}:{producer_local_ordinal}:{children_fingerprint}"
))
}
fn canonicalize_dyn_filter_children(children: &[Arc<dyn PhysicalExpr>]) -> Result<String> {
let codec = DefaultPhysicalExtensionCodec {};
let mut encoded_children = Vec::with_capacity(children.len());
for child in children {
let proto = serialize_physical_expr(child, &codec)?;
let mut bytes = Vec::new();
proto
.encode(&mut bytes)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut hasher = DefaultHasher::new();
hasher.write_usize(bytes.len());
hasher.write(&bytes);
encoded_children.push(format!("{:016x}", hasher.finish()));
}
Ok(encoded_children.join(","))
}
/// Metrics for a region of a partition.
#[derive(Debug, Clone)]
struct RegionMetrics {
@@ -713,3 +755,65 @@ impl MergeScanMetric {
self.greptime_exec_cost.add(metrics);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_children(names: &[&str]) -> Vec<Arc<dyn PhysicalExpr>> {
names
.iter()
.enumerate()
.map(|(index, name)| Arc::new(Column::new(*name, index)) as Arc<dyn PhysicalExpr>)
.collect()
}
#[test]
fn remote_dyn_filter_id_is_stable_for_equivalent_children() {
let region_id = RegionId::new(1024, 7);
let first =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
let second =
build_remote_dyn_filter_id(region_id, 3, &test_children(&["host", "pod"])).unwrap();
assert_eq!(first, second);
}
#[test]
fn remote_dyn_filter_id_changes_when_identity_inputs_change() {
let children = test_children(&["host", "pod"]);
let baseline = build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &children).unwrap();
let different_region =
build_remote_dyn_filter_id(RegionId::new(1024, 8), 3, &children).unwrap();
let different_ordinal =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 4, &children).unwrap();
let different_children =
build_remote_dyn_filter_id(RegionId::new(1024, 7), 3, &test_children(&["pod", "host"]))
.unwrap();
assert_ne!(baseline, different_region);
assert_ne!(baseline, different_ordinal);
assert_ne!(baseline, different_children);
}
#[test]
fn remote_dyn_filter_id_has_no_partition_dimension() {
let children = test_children(&["host"]);
let partition_zero =
build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap();
let partition_one =
build_remote_dyn_filter_id(RegionId::new(2048, 1), 0, &children).unwrap();
assert_eq!(partition_zero, partition_one);
}
#[test]
fn remote_dyn_filter_id_supports_empty_children() {
let region_id = RegionId::new(4096, 2);
let first = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
let second = build_remote_dyn_filter_id(region_id, 1, &[]).unwrap();
assert_eq!(first, second);
assert_eq!(first, format!("{region_id}:1:"));
}
}