diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 0127829567..b9326b066d 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -445,10 +445,16 @@ impl Pool { async fn recycle_channel_in_loop(pool: Arc, interval_secs: u64) { let mut interval = tokio::time::interval(Duration::from_secs(interval_secs)); - + // use weak ref here to prevent pool being leaked + let pool_weak = Arc::downgrade(&pool); loop { let _ = interval.tick().await; - pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0) + if let Some(pool) = pool_weak.upgrade() { + pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0) + } else { + // no one is using this pool, so we can also let go + break; + } } } diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index 3a279a18c9..30ed8d7a27 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -378,6 +378,8 @@ impl RecordingRuleTask { .write() .await .after_query_exec(elapsed, res.is_ok()); + // drop the result to free client-related resources + drop(res); let sleep_until = { let mut state = self.state.write().await; diff --git a/src/flow/src/recording_rules/frontend_client.rs b/src/flow/src/recording_rules/frontend_client.rs index e71156f783..b822958d7f 100644 --- a/src/flow/src/recording_rules/frontend_client.rs +++ b/src/flow/src/recording_rules/frontend_client.rs @@ -43,6 +43,7 @@ fn client_from_urls(addrs: Vec) -> Client { pub enum FrontendClient { Distributed { meta_client: Arc, + channel_mgr: ChannelManager, }, Standalone { /// for the sake of simplicity still use grpc even in standalone mode @@ -66,7 +67,10 @@ impl DatabaseWithPeer { impl FrontendClient { pub fn from_meta_client(meta_client: Arc) -> Self { - Self::Distributed { meta_client } + Self::Distributed { + meta_client, + channel_mgr: default_channel_mgr(), + } } pub fn from_static_grpc_addr(addr: String) -> Self { @@ -75,7 +79,8 @@ impl FrontendClient { addr: addr.clone(), }; - let client = client_from_urls(vec![addr]); + let mgr = default_channel_mgr(); + let client = Client::with_manager_and_urls(mgr.clone(), vec![addr]); let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); Self::Standalone { database_client: DatabaseWithPeer::new(database, peer), @@ -119,32 +124,40 @@ impl FrontendClient { if let Self::Standalone { database_client } = self { return Ok(database_client.clone()); } - - let frontends = self.scan_for_frontend().await?; - let mut last_activity_ts = i64::MIN; - let mut peer = None; - for (_key, val) in frontends.iter() { - if val.last_activity_ts > last_activity_ts { - last_activity_ts = val.last_activity_ts; - peer = Some(val.peer.clone()); + match &self { + Self::Standalone { database_client } => Ok(database_client.clone()), + Self::Distributed { + meta_client: _, + channel_mgr, + } => { + let frontends = self.scan_for_frontend().await?; + let mut last_activity_ts = i64::MIN; + let mut peer = None; + for (_key, val) in frontends.iter() { + if val.last_activity_ts > last_activity_ts { + last_activity_ts = val.last_activity_ts; + peer = Some(val.peer.clone()); + } + } + let Some(peer) = peer else { + UnexpectedSnafu { + reason: format!("No frontend available: {:?}", frontends), + } + .fail()? + }; + let client = + Client::with_manager_and_urls(channel_mgr.clone(), vec![peer.addr.clone()]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + Ok(DatabaseWithPeer::new(database, peer)) } } - let Some(peer) = peer else { - UnexpectedSnafu { - reason: format!("No frontend available: {:?}", frontends), - } - .fail()? - }; - let client = client_from_urls(vec![peer.addr.clone()]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - Ok(DatabaseWithPeer::new(database, peer)) } /// Get a database client, and possibly update it before returning. pub async fn get_database_client(&self) -> Result { match self { Self::Standalone { database_client } => Ok(database_client.clone()), - Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await, + Self::Distributed { meta_client: _, .. } => self.get_last_active_frontend().await, } } } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 4b7f2137d6..6fe1567818 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [features] mock = [] pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"] +mysql_kvbackend = [] # placeholder features so CI can compile [lints] workspace = true