mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 13:50:40 +00:00
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod catalog;
|
||||
mod registrations;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
@@ -99,10 +98,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::event_listener::RegionServerEventListenerRef;
|
||||
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
|
||||
use crate::region_server::registrations::{
|
||||
RegisteredDynFilter, initial_dyn_filter_regs_from_query_ctx, register_initial_dyn_filter_regs,
|
||||
remove_initial_dyn_filter_regs_for_region,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RegionServer {
|
||||
@@ -280,18 +275,6 @@ impl RegionServer {
|
||||
common_telemetry::info!("Handle remote read for region: {}", region_id);
|
||||
}
|
||||
|
||||
let initial_dyn_filter_regs = initial_dyn_filter_regs_from_query_ctx(&query_ctx);
|
||||
if query_ctx.explain_verbose() {
|
||||
common_telemetry::info!(
|
||||
"Initial remote dyn filter registrations for region {}: {}",
|
||||
region_id,
|
||||
initial_dyn_filter_regs
|
||||
.as_ref()
|
||||
.map(|regs| regs.regs.len())
|
||||
.unwrap_or(0)
|
||||
);
|
||||
}
|
||||
|
||||
let decoder = self
|
||||
.inner
|
||||
.query_engine
|
||||
@@ -304,19 +287,7 @@ impl RegionServer {
|
||||
.await
|
||||
.context(DecodeLogicalPlanSnafu)?;
|
||||
|
||||
let query_id = query_ctx.remote_query_id().map(ToOwned::to_owned);
|
||||
if let (Some(query_id), Some(regs)) =
|
||||
(query_id.as_deref(), initial_dyn_filter_regs.as_ref())
|
||||
{
|
||||
register_initial_dyn_filter_regs(
|
||||
&self.inner.initial_remote_dyn_filter_registrations,
|
||||
query_id,
|
||||
region_id,
|
||||
regs,
|
||||
);
|
||||
}
|
||||
|
||||
let result = self
|
||||
let stream = self
|
||||
.inner
|
||||
.handle_read(
|
||||
QueryRequest {
|
||||
@@ -326,19 +297,7 @@ impl RegionServer {
|
||||
},
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if result.is_err()
|
||||
&& let Some(query_id) = query_id.as_deref()
|
||||
{
|
||||
remove_initial_dyn_filter_regs_for_region(
|
||||
&self.inner.initial_remote_dyn_filter_registrations,
|
||||
query_id,
|
||||
region_id,
|
||||
);
|
||||
}
|
||||
|
||||
let stream = result?;
|
||||
.await?;
|
||||
let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
|
||||
Ok(maybe_guard_stream(stream, permit))
|
||||
}
|
||||
@@ -1094,9 +1053,6 @@ struct RegionServerInner {
|
||||
/// server with a concrete engine; acceptable for now to fetch Mito-specific
|
||||
/// info (e.g., list SSTs). Consider a diagnostics trait later.
|
||||
mito_engine: RwLock<Option<MitoEngine>>,
|
||||
/// TODO(remote-dyn-filter): Reap this query-scoped placeholder registry on query finish/cancel
|
||||
/// and later fold it into the real remote dyn filter runtime state lifecycle.
|
||||
initial_remote_dyn_filter_registrations: DashMap<String, DashMap<String, RegisteredDynFilter>>,
|
||||
}
|
||||
|
||||
struct RegionServerParallelism {
|
||||
@@ -1213,7 +1169,6 @@ impl RegionServerInner {
|
||||
parallelism,
|
||||
topic_stats_reporter: RwLock::new(None),
|
||||
mito_engine: RwLock::new(None),
|
||||
initial_remote_dyn_filter_registrations: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1949,10 +1904,6 @@ mod tests {
|
||||
remote_dyn_filter_request,
|
||||
};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::request::{
|
||||
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterReg,
|
||||
InitialDynFilterRegs,
|
||||
};
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
|
||||
use datatypes::prelude::{ConcreteDataType, VectorRef};
|
||||
@@ -2110,176 +2061,6 @@ mod tests {
|
||||
assert!(pinned.as_ref().get_ref().metrics().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn initial_dyn_filter_regs_can_be_read_from_query_context() {
|
||||
let mut query_ctx = QueryContext::with("greptime", "public");
|
||||
query_ctx.set_extension(
|
||||
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
|
||||
InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
|
||||
"filter-1",
|
||||
vec![vec![1, 2, 3]],
|
||||
)])
|
||||
.to_extension_value()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx)).unwrap();
|
||||
|
||||
assert_eq!(regs.regs.len(), 1);
|
||||
assert_eq!(regs.regs[0].filter_id, "filter-1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn initial_dyn_filter_regs_from_query_context_rejects_duplicate_filter_ids() {
|
||||
let mut query_ctx = QueryContext::with("greptime", "public");
|
||||
query_ctx.set_extension(
|
||||
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY,
|
||||
InitialDynFilterRegs::new(vec![
|
||||
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
|
||||
InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
|
||||
])
|
||||
.to_extension_value()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let regs = initial_dyn_filter_regs_from_query_ctx(&Arc::new(query_ctx));
|
||||
|
||||
assert!(regs.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_initial_dyn_filter_regs_creates_query_scoped_entries() {
|
||||
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
|
||||
let regs = InitialDynFilterRegs::new(vec![
|
||||
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
|
||||
InitialDynFilterReg::new("filter-2", vec![vec![4, 5, 6]]),
|
||||
]);
|
||||
let query_id = "query-1";
|
||||
let region_id = RegionId::new(1024, 7);
|
||||
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s);
|
||||
|
||||
let query_regs = regs_by_query.get(query_id).unwrap();
|
||||
assert_eq!(query_regs.len(), 2);
|
||||
let registered = query_regs.get("filter-1").unwrap();
|
||||
assert_eq!(registered.filter_id, "filter-1");
|
||||
assert_eq!(registered.child_exprs_datafusion_proto, vec![vec![1, 2, 3]]);
|
||||
assert_eq!(registered.subscriber_regions, vec![region_id]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_initial_dyn_filter_regs_ignores_duplicate_region_entry() {
|
||||
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
|
||||
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
|
||||
"filter-1",
|
||||
vec![vec![1, 2, 3]],
|
||||
)]);
|
||||
let query_id = "query-1";
|
||||
let region_id = RegionId::new(1024, 7);
|
||||
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s);
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s);
|
||||
|
||||
let query_regs = regs_by_query.get(query_id).unwrap();
|
||||
assert_eq!(query_regs.len(), 1);
|
||||
let registered = query_regs.get("filter-1").unwrap();
|
||||
assert_eq!(registered.subscriber_regions, vec![region_id]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_initial_dyn_filter_regs_ignores_invalid_duplicate_payload_set() {
|
||||
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
|
||||
let regs = InitialDynFilterRegs::new(vec![
|
||||
InitialDynFilterReg::new("filter-1", vec![vec![1, 2, 3]]),
|
||||
InitialDynFilterReg::new("filter-1", vec![vec![4, 5, 6]]),
|
||||
]);
|
||||
let query_id = "query-1";
|
||||
let region_id = RegionId::new(1024, 7);
|
||||
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, region_id, ®s);
|
||||
|
||||
assert!(regs_by_query.get(query_id).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn register_initial_dyn_filter_regs_merges_regions_for_same_filter() {
|
||||
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
|
||||
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
|
||||
"filter-1",
|
||||
vec![vec![1, 2, 3]],
|
||||
)]);
|
||||
let query_id = "query-1";
|
||||
let first_region_id = RegionId::new(1024, 7);
|
||||
let second_region_id = RegionId::new(1024, 8);
|
||||
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, first_region_id, ®s);
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, second_region_id, ®s);
|
||||
|
||||
let query_regs = regs_by_query.get(query_id).unwrap();
|
||||
assert_eq!(query_regs.len(), 1);
|
||||
let registered = query_regs.get("filter-1").unwrap();
|
||||
assert_eq!(
|
||||
registered.subscriber_regions,
|
||||
vec![first_region_id, second_region_id]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_initial_dyn_filter_regs_for_region_removes_region_entries() {
|
||||
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
|
||||
let query_id = "query-1";
|
||||
let region_id = RegionId::new(1024, 7);
|
||||
let other_query_id = "query-2";
|
||||
let other_region_id = RegionId::new(1024, 8);
|
||||
|
||||
register_initial_dyn_filter_regs(
|
||||
®s_by_query,
|
||||
query_id,
|
||||
region_id,
|
||||
&InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
|
||||
"filter-1",
|
||||
vec![vec![1, 2, 3]],
|
||||
)]),
|
||||
);
|
||||
register_initial_dyn_filter_regs(
|
||||
®s_by_query,
|
||||
other_query_id,
|
||||
other_region_id,
|
||||
&InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
|
||||
"filter-2",
|
||||
vec![vec![4, 5, 6]],
|
||||
)]),
|
||||
);
|
||||
|
||||
remove_initial_dyn_filter_regs_for_region(®s_by_query, query_id, region_id);
|
||||
|
||||
assert!(regs_by_query.get(query_id).is_none());
|
||||
let other_query_regs = regs_by_query.get(other_query_id).unwrap();
|
||||
assert_eq!(other_query_regs.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_initial_dyn_filter_regs_for_region_keeps_other_subscribers() {
|
||||
let regs_by_query = DashMap::<String, DashMap<String, RegisteredDynFilter>>::new();
|
||||
let query_id = "query-1";
|
||||
let first_region_id = RegionId::new(1024, 7);
|
||||
let second_region_id = RegionId::new(1024, 8);
|
||||
let regs = InitialDynFilterRegs::new(vec![InitialDynFilterReg::new(
|
||||
"filter-1",
|
||||
vec![vec![1, 2, 3]],
|
||||
)]);
|
||||
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, first_region_id, ®s);
|
||||
register_initial_dyn_filter_regs(®s_by_query, query_id, second_region_id, ®s);
|
||||
|
||||
remove_initial_dyn_filter_regs_for_region(®s_by_query, query_id, first_region_id);
|
||||
|
||||
let query_regs = regs_by_query.get(query_id).unwrap();
|
||||
assert_eq!(query_regs.len(), 1);
|
||||
let registered = query_regs.get("filter-1").unwrap();
|
||||
assert_eq!(registered.subscriber_regions, vec![second_region_id]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_region_registering() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
@@ -1,160 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_query::request::{
|
||||
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, InitialDynFilterRegs,
|
||||
};
|
||||
use common_telemetry::warn;
|
||||
use dashmap::DashMap;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(super) struct RegisteredDynFilter {
|
||||
pub(super) filter_id: String,
|
||||
pub(super) child_exprs_datafusion_proto: Vec<Vec<u8>>,
|
||||
pub(super) subscriber_regions: Vec<RegionId>,
|
||||
}
|
||||
|
||||
impl RegisteredDynFilter {
|
||||
fn new(
|
||||
filter_id: String,
|
||||
child_exprs_datafusion_proto: Vec<Vec<u8>>,
|
||||
region_id: RegionId,
|
||||
) -> Self {
|
||||
Self {
|
||||
filter_id,
|
||||
child_exprs_datafusion_proto,
|
||||
subscriber_regions: vec![region_id],
|
||||
}
|
||||
}
|
||||
|
||||
fn register_region(&mut self, region_id: RegionId) -> bool {
|
||||
if self.subscriber_regions.contains(®ion_id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.subscriber_regions.push(region_id);
|
||||
true
|
||||
}
|
||||
|
||||
fn remove_region(&mut self, region_id: RegionId) -> bool {
|
||||
let original_len = self.subscriber_regions.len();
|
||||
self.subscriber_regions
|
||||
.retain(|region| *region != region_id);
|
||||
original_len != self.subscriber_regions.len()
|
||||
}
|
||||
|
||||
fn has_subscribers(&self) -> bool {
|
||||
!self.subscriber_regions.is_empty()
|
||||
}
|
||||
|
||||
fn should_drop_after_remove(&mut self, region_id: RegionId) -> bool {
|
||||
self.remove_region(region_id) && !self.has_subscribers()
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn initial_dyn_filter_regs_from_query_ctx(
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Option<InitialDynFilterRegs> {
|
||||
let registrations =
|
||||
query_ctx.extension(INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY)?;
|
||||
match InitialDynFilterRegs::from_extension_value(registrations) {
|
||||
Ok(registrations) => match registrations.validate_default_bounds() {
|
||||
Ok(()) => Some(registrations),
|
||||
Err(error) => {
|
||||
warn!(error; "Initial remote dyn filter registrations exceeded Task 03 bounds");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
warn!(error; "Failed to decode initial remote dyn filter registrations from query context");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn register_initial_dyn_filter_regs(
|
||||
regs_by_query: &DashMap<String, DashMap<String, RegisteredDynFilter>>,
|
||||
query_id: &str,
|
||||
region_id: RegionId,
|
||||
regs: &InitialDynFilterRegs,
|
||||
) {
|
||||
if regs.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(error) = regs.validate_default_bounds() {
|
||||
warn!(error; "Ignored invalid initial dyn filter registrations for query_id {} region_id {}", query_id, region_id);
|
||||
return;
|
||||
}
|
||||
|
||||
let query_regs = regs_by_query.entry(query_id.to_string()).or_default();
|
||||
|
||||
for reg in ®s.regs {
|
||||
if let Some(mut registered) = query_regs.get_mut(®.filter_id) {
|
||||
if registered.register_region(region_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
warn!(
|
||||
query_id,
|
||||
filter_id = reg.filter_id,
|
||||
region_id = %region_id,
|
||||
"Duplicate initial dyn filter reg ignored"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
query_regs.insert(
|
||||
reg.filter_id.clone(),
|
||||
RegisteredDynFilter::new(
|
||||
reg.filter_id.clone(),
|
||||
reg.child_exprs_datafusion_proto.clone(),
|
||||
region_id,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn remove_initial_dyn_filter_regs_for_region(
|
||||
regs_by_query: &DashMap<String, DashMap<String, RegisteredDynFilter>>,
|
||||
query_id: &str,
|
||||
region_id: RegionId,
|
||||
) {
|
||||
let should_remove_query = {
|
||||
let Some(query_regs) = regs_by_query.get(query_id) else {
|
||||
return;
|
||||
};
|
||||
|
||||
let filter_ids_to_remove = query_regs
|
||||
.iter_mut()
|
||||
.filter_map(|mut registered| {
|
||||
registered
|
||||
.should_drop_after_remove(region_id)
|
||||
.then(|| registered.filter_id.clone())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for filter_id in filter_ids_to_remove {
|
||||
query_regs.remove(&filter_id);
|
||||
}
|
||||
|
||||
query_regs.is_empty()
|
||||
};
|
||||
|
||||
if should_remove_query {
|
||||
regs_by_query.remove(query_id);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user