diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 7afb1c2275..9267449b6f 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -192,8 +192,7 @@ impl ConnectMechanism for TokioMechanism { connection, self.conn_id, node_info.aux.clone(), - ) - .await) + )) } fn update_connect_config(&self, _config: &mut compute::ConnCfg) {} diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 13847bde85..abb9549185 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -140,7 +140,7 @@ impl EndpointConnPool { fn put( pool: &RwLock, - mut node: Pin<&mut Node>>, + node: Pin<&mut Node>>, db_user: &(DbName, RoleName), client: ClientInner, conn_info: ConnInfo, @@ -168,11 +168,6 @@ impl EndpointConnPool { if pool.total_conns < pool.max_conns { let pool_entries = pool.pools.entry(db_user.clone()).or_default(); - if let Some(node) = node.as_mut().initialized_mut() { - if node.take_removed(&pool_entries.conns).is_err() { - panic!("client is already in the pool") - }; - } pool_entries.conns.cursor_front_mut().insert_after( node, ConnPoolEntry { @@ -457,7 +452,6 @@ impl GlobalConnPool { cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); - client.session.send(ctx.session_id)?; ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.latency_timer.success(); return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); @@ -520,7 +514,7 @@ type ConnTypes = dyn pin_list::Types< Unprotected = (), >; -pub async fn poll_tokio_client( +pub fn poll_tokio_client( global_pool: Arc>, ctx: &mut RequestMonitoring, conn_info: ConnInfo, @@ -580,7 +574,6 @@ pub fn poll_client + Send + 'static>( ) -> Client { let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol); let session_id = ctx.session_id; - let (tx, rx) = tokio::sync::watch::channel(session_id); let span = info_span!(parent: None, "connection", %conn_id); let cold_start_info = ctx.cold_start_info; @@ -609,7 +602,6 @@ pub fn poll_client + Send + 'static>( pool: pool.clone(), session_span, - session_rx: rx, conn_gauge, conn_id, @@ -620,7 +612,6 @@ pub fn poll_client + Send + 'static>( let inner = ClientInner { inner: client, - session: tx, pool: send_client, cancel, aux, @@ -649,7 +640,6 @@ pin_project! { // Used for reporting the current session the conn is attached to session_span: tracing::Span, - session_rx: tokio::sync::watch::Receiver, // Static connection state conn_gauge: NumDbConnectionsGuard<'static>, @@ -670,14 +660,29 @@ impl> Future for DbConnection { return Poll::Ready(()); } + // if there's no pool, then this client will be closed. + let Some(pool) = this.pool.upgrade() else { + info!("connection dropped"); + return Poll::Ready(()); + }; + + if let Some(init) = this.node.as_mut().initialized_mut() { + if let Some(entry) = pool.read().pools.get(this.db_user) { + if let Ok((session_id, _)) = init.take_removed(&entry.conns) { + *this.session_span = info_span!("", %session_id); + let _span = this.session_span.enter(); + info!("changed session"); + this.idle_timeout + .as_mut() + .reset(Instant::now() + *this.idle); + }; + } + } + if let Poll::Ready(client) = this.recv_client.poll_recv(cx) { // if the send_client is dropped, then the client is dropped let Some((span, client, conn_info)) = client else { - info!("connection dropped"); - return Poll::Ready(()); - }; - // if there's no pool, then this client will be closed. - let Some(pool) = this.pool.upgrade() else { + let _span = this.session_span.enter(); info!("connection dropped"); return Poll::Ready(()); }; @@ -688,24 +693,6 @@ impl> Future for DbConnection { } } - match this.session_rx.has_changed() { - Ok(true) => { - let session_id = *this.session_rx.borrow_and_update(); - *this.session_span = info_span!("", %session_id); - let _span = this.session_span.enter(); - info!("changed session"); - this.idle_timeout - .as_mut() - .reset(Instant::now() + *this.idle); - } - Err(_) => { - let _span = this.session_span.enter(); - info!("connection dropped"); - return Poll::Ready(()); - } - _ => {} - } - let _span = this.session_span.enter(); // 5 minute idle connection timeout @@ -729,13 +716,11 @@ impl> Future for DbConnection { ready!(this.connection.poll(cx)); // remove from connection pool - if let Some(pool) = this.pool.upgrade() { - if pool - .write() - .remove_client(this.db_user.clone(), *this.conn_id) - { - info!("closed connection removed"); - } + if pool + .write() + .remove_client(this.db_user.clone(), *this.conn_id) + { + info!("closed connection removed"); } Poll::Ready(()) @@ -744,7 +729,6 @@ impl> Future for DbConnection { struct ClientInner { inner: C, - session: tokio::sync::watch::Sender, pool: tokio::sync::mpsc::Sender<(tracing::Span, ClientInner, ConnInfo)>, cancel: CancellationToken, aux: MetricsAuxInfo,