mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
rely on channel cancellation signals on drop
This commit is contained in:
@@ -19,7 +19,6 @@ use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::time::{Instant, Sleep};
|
||||
use tokio_postgres::tls::NoTlsStream;
|
||||
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
|
||||
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
|
||||
|
||||
use crate::console::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics, NumDbConnectionsGuard};
|
||||
@@ -536,12 +535,9 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
.map(|endpoint| global_pool.get_or_create_endpoint_pool(&endpoint));
|
||||
|
||||
let idle = global_pool.get_idle_timeout();
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (send_client, recv_client) = tokio::sync::mpsc::channel(1);
|
||||
let db_conn = DbConnection {
|
||||
cancelled: cancel.clone().cancelled_owned(),
|
||||
|
||||
idle_timeout: tokio::time::sleep(idle),
|
||||
idle,
|
||||
|
||||
@@ -562,7 +558,6 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
let inner = ClientInner {
|
||||
inner: client,
|
||||
pool: send_client,
|
||||
cancel,
|
||||
aux,
|
||||
conn_id,
|
||||
};
|
||||
@@ -571,10 +566,6 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
|
||||
pin_project! {
|
||||
struct DbConnection<C: ClientInnerExt, Inner> {
|
||||
// Used to close the current conn if the client is dropped
|
||||
#[pin]
|
||||
cancelled: WaitForCancellationFutureOwned,
|
||||
|
||||
// Used to close the current conn if it's idle
|
||||
#[pin]
|
||||
idle_timeout: Sleep,
|
||||
@@ -614,12 +605,6 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
if this.cancelled.as_mut().poll(cx).is_ready() {
|
||||
let _span = this.session_span.enter();
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// node is initiated via EndpointConnPool::put.
|
||||
// this is only called in the if statement below.
|
||||
// this can only occur if pool is set (and pool is never removed).
|
||||
@@ -689,18 +674,10 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
struct ClientInner<C: ClientInnerExt> {
|
||||
inner: C,
|
||||
pool: tokio::sync::mpsc::Sender<(tracing::Span, ClientInner<C>, ConnInfo)>,
|
||||
cancel: CancellationToken,
|
||||
aux: MetricsAuxInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Drop for ClientInner<C> {
|
||||
fn drop(&mut self) {
|
||||
// on client drop, tell the conn to shut down
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ClientInnerExt: Sync + Send + 'static {
|
||||
fn is_closed(&self) -> bool;
|
||||
fn get_process_id(&self) -> i32;
|
||||
@@ -829,6 +806,7 @@ impl<C: ClientInnerExt> Drop for Client<C> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::task::yield_now;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{BranchId, EndpointId, ProjectId};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user