mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix: conn pool leak & placeholder feature so ci can compile
This commit is contained in:
@@ -445,10 +445,16 @@ impl Pool {
|
||||
|
||||
async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
|
||||
|
||||
// use weak ref here to prevent pool being leaked
|
||||
let pool_weak = Arc::downgrade(&pool);
|
||||
loop {
|
||||
let _ = interval.tick().await;
|
||||
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
|
||||
if let Some(pool) = pool_weak.upgrade() {
|
||||
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
|
||||
} else {
|
||||
// no one is using this pool, so we can also let go
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -378,6 +378,8 @@ impl RecordingRuleTask {
|
||||
.write()
|
||||
.await
|
||||
.after_query_exec(elapsed, res.is_ok());
|
||||
// drop the result to free client-related resources
|
||||
drop(res);
|
||||
|
||||
let sleep_until = {
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
@@ -43,6 +43,7 @@ fn client_from_urls(addrs: Vec<String>) -> Client {
|
||||
pub enum FrontendClient {
|
||||
Distributed {
|
||||
meta_client: Arc<MetaClient>,
|
||||
channel_mgr: ChannelManager,
|
||||
},
|
||||
Standalone {
|
||||
/// for the sake of simplicity still use grpc even in standalone mode
|
||||
@@ -66,7 +67,10 @@ impl DatabaseWithPeer {
|
||||
|
||||
impl FrontendClient {
|
||||
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
|
||||
Self::Distributed { meta_client }
|
||||
Self::Distributed {
|
||||
meta_client,
|
||||
channel_mgr: default_channel_mgr(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_static_grpc_addr(addr: String) -> Self {
|
||||
@@ -75,7 +79,8 @@ impl FrontendClient {
|
||||
addr: addr.clone(),
|
||||
};
|
||||
|
||||
let client = client_from_urls(vec![addr]);
|
||||
let mgr = default_channel_mgr();
|
||||
let client = Client::with_manager_and_urls(mgr.clone(), vec![addr]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Self::Standalone {
|
||||
database_client: DatabaseWithPeer::new(database, peer),
|
||||
@@ -119,32 +124,40 @@ impl FrontendClient {
|
||||
if let Self::Standalone { database_client } = self {
|
||||
return Ok(database_client.clone());
|
||||
}
|
||||
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut last_activity_ts = i64::MIN;
|
||||
let mut peer = None;
|
||||
for (_key, val) in frontends.iter() {
|
||||
if val.last_activity_ts > last_activity_ts {
|
||||
last_activity_ts = val.last_activity_ts;
|
||||
peer = Some(val.peer.clone());
|
||||
match &self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed {
|
||||
meta_client: _,
|
||||
channel_mgr,
|
||||
} => {
|
||||
let frontends = self.scan_for_frontend().await?;
|
||||
let mut last_activity_ts = i64::MIN;
|
||||
let mut peer = None;
|
||||
for (_key, val) in frontends.iter() {
|
||||
if val.last_activity_ts > last_activity_ts {
|
||||
last_activity_ts = val.last_activity_ts;
|
||||
peer = Some(val.peer.clone());
|
||||
}
|
||||
}
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client =
|
||||
Client::with_manager_and_urls(channel_mgr.clone(), vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
}
|
||||
let Some(peer) = peer else {
|
||||
UnexpectedSnafu {
|
||||
reason: format!("No frontend available: {:?}", frontends),
|
||||
}
|
||||
.fail()?
|
||||
};
|
||||
let client = client_from_urls(vec![peer.addr.clone()]);
|
||||
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
|
||||
Ok(DatabaseWithPeer::new(database, peer))
|
||||
}
|
||||
|
||||
/// Get a database client, and possibly update it before returning.
|
||||
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
|
||||
match self {
|
||||
Self::Standalone { database_client } => Ok(database_client.clone()),
|
||||
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
|
||||
Self::Distributed { meta_client: _, .. } => self.get_last_active_frontend().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ license.workspace = true
|
||||
[features]
|
||||
mock = []
|
||||
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
|
||||
mysql_kvbackend = [] # placeholder features so CI can compile
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
Reference in New Issue
Block a user