mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-04 04:50:37 +00:00
@@ -100,9 +100,7 @@ pub(super) fn register_initial_dyn_filter_regs(
|
||||
return;
|
||||
}
|
||||
|
||||
let query_regs = regs_by_query
|
||||
.entry(query_id.to_string())
|
||||
.or_insert_with(DashMap::new);
|
||||
let query_regs = regs_by_query.entry(query_id.to_string()).or_default();
|
||||
|
||||
for reg in ®s.regs {
|
||||
if let Some(mut registered) = query_regs.get_mut(®.filter_id) {
|
||||
|
||||
@@ -20,7 +20,7 @@ use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
|
||||
use session::query_id::QueryId;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::dist_plan::{FilterId, ProducerScopeId};
|
||||
use crate::dist_plan::FilterId;
|
||||
|
||||
/// Routing metadata for a remote dynamic filter subscriber.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -235,9 +235,7 @@ impl DynFilterRegistryManager {
|
||||
registry: &Arc<QueryDynFilterRegistry>,
|
||||
) -> Option<Arc<QueryDynFilterRegistry>> {
|
||||
let mut registries = self.registries.write().unwrap();
|
||||
let Some(current) = registries.get(query_id) else {
|
||||
return None;
|
||||
};
|
||||
let current = registries.get(query_id)?;
|
||||
|
||||
if Arc::ptr_eq(current, registry) && registry.active_stream_count() == 0 {
|
||||
registries.remove(query_id)
|
||||
@@ -271,7 +269,7 @@ mod tests {
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::dist_plan::FilterFingerprint;
|
||||
use crate::dist_plan::{FilterFingerprint, ProducerScopeId};
|
||||
|
||||
fn test_query_id(value: u128) -> QueryId {
|
||||
QueryId::from(Uuid::from_u128(value))
|
||||
|
||||
@@ -34,9 +34,7 @@ use common_telemetry::{debug, error, tracing, warn};
|
||||
use common_time::timezone::parse_timezone;
|
||||
use futures_util::StreamExt;
|
||||
use session::context::{Channel, QueryContextBuilder, QueryContextRef};
|
||||
use session::hints::{
|
||||
READ_PREFERENCE_HINT, REMOTE_QUERY_ID_EXTENSION_KEY, is_reserved_extension_key,
|
||||
};
|
||||
use session::hints::{READ_PREFERENCE_HINT, is_reserved_extension_key};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
@@ -288,7 +286,9 @@ impl Drop for RequestTimer {
|
||||
mod tests {
|
||||
use chrono::FixedOffset;
|
||||
use common_time::Timezone;
|
||||
use session::hints::INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY;
|
||||
use session::hints::{
|
||||
INITIAL_REMOTE_DYN_FILTER_REGISTRATIONS_EXTENSION_KEY, REMOTE_QUERY_ID_EXTENSION_KEY,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user