mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
separate connection from poll client
This commit is contained in:
@@ -16,7 +16,7 @@ use crate::{
|
||||
proxy::connect_compute::ConnectMechanism,
|
||||
};
|
||||
|
||||
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
|
||||
use super::conn_pool::{poll_tokio_client, Client, ConnInfo, GlobalConnPool};
|
||||
|
||||
pub struct PoolingBackend {
|
||||
pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
@@ -184,7 +184,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
drop(pause);
|
||||
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
Ok(poll_client(
|
||||
Ok(poll_tokio_client(
|
||||
self.pool.clone(),
|
||||
ctx,
|
||||
self.conn_info.clone(),
|
||||
|
||||
@@ -461,12 +461,61 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_client<C: ClientInnerExt>(
|
||||
pub fn poll_tokio_client(
|
||||
global_pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
ctx: &mut RequestMonitoring,
|
||||
conn_info: ConnInfo,
|
||||
client: tokio_postgres::Client,
|
||||
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<tokio_postgres::Client> {
|
||||
let connection = std::future::poll_fn(move |cx| {
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!("notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(
|
||||
pid = notif.process_id(),
|
||||
channel = notif.channel(),
|
||||
"notification received"
|
||||
);
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!("unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("connection error: {}", e);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(())
|
||||
});
|
||||
poll_client(
|
||||
global_pool,
|
||||
ctx,
|
||||
conn_info,
|
||||
client,
|
||||
connection,
|
||||
conn_id,
|
||||
aux,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
global_pool: Arc<GlobalConnPool<C>>,
|
||||
ctx: &mut RequestMonitoring,
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
|
||||
connection: I,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<C> {
|
||||
@@ -476,8 +525,9 @@ pub fn poll_client<C: ClientInnerExt>(
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let cold_start_info = ctx.cold_start_info;
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
let session_span = info_span!(parent: span.clone(), "", %session_id);
|
||||
session_span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, "new connection");
|
||||
});
|
||||
let pool = match conn_info.endpoint_cache_key() {
|
||||
Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)),
|
||||
@@ -493,16 +543,14 @@ pub fn poll_client<C: ClientInnerExt>(
|
||||
idle,
|
||||
db_user: conn_info.db_and_user(),
|
||||
pool: pool.clone(),
|
||||
session_id,
|
||||
session_span,
|
||||
session_rx: rx,
|
||||
conn_gauge,
|
||||
conn_id,
|
||||
connection,
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
db_conn.instrument(span).await;
|
||||
});
|
||||
tokio::spawn(db_conn.instrument(span));
|
||||
|
||||
let inner = ClientInner {
|
||||
inner: client,
|
||||
@@ -515,7 +563,7 @@ pub fn poll_client<C: ClientInnerExt>(
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
struct DbConnection<C: ClientInnerExt> {
|
||||
struct DbConnection<C: ClientInnerExt, Inner> {
|
||||
#[pin]
|
||||
cancelled: WaitForCancellationFutureOwned,
|
||||
|
||||
@@ -526,40 +574,47 @@ pin_project! {
|
||||
db_user: (DbName, RoleName),
|
||||
pool: Weak<RwLock<EndpointConnPool<C>>>,
|
||||
|
||||
session_id: uuid::Uuid,
|
||||
session_span: tracing::Span,
|
||||
session_rx: tokio::sync::watch::Receiver<uuid::Uuid>,
|
||||
|
||||
conn_gauge: NumDbConnectionsGuard<'static>,
|
||||
conn_id: uuid::Uuid,
|
||||
connection: tokio_postgres::Connection<Socket, NoTlsStream>,
|
||||
#[pin]
|
||||
connection: Inner,
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> Future for DbConnection<C> {
|
||||
impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
let mut this = self.project();
|
||||
if this.cancelled.as_mut().poll(cx).is_ready() {
|
||||
let _span = this.session_span.enter();
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
match this.session_rx.has_changed() {
|
||||
Ok(true) => {
|
||||
*this.session_id = *this.session_rx.borrow_and_update();
|
||||
info!(%this.session_id, "changed session");
|
||||
let session_id = *this.session_rx.borrow_and_update();
|
||||
*this.session_span = info_span!("", %session_id);
|
||||
let _span = this.session_span.enter();
|
||||
info!("changed session");
|
||||
this.idle_timeout
|
||||
.as_mut()
|
||||
.reset(Instant::now() + *this.idle);
|
||||
}
|
||||
Err(_) => {
|
||||
let _span = this.session_span.enter();
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let _span = this.session_span.enter();
|
||||
|
||||
// 5 minute idle connection timeout
|
||||
if this.idle_timeout.as_mut().poll(cx).is_ready() {
|
||||
this.idle_timeout
|
||||
@@ -578,29 +633,7 @@ impl<C: ClientInnerExt> Future for DbConnection<C> {
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(this.connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(session_id = %this.session_id, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(session_id = %this.session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(session_id = %this.session_id, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(session_id = %this.session_id, "connection error: {}", e);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
ready!(this.connection.poll(cx));
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = this.pool.upgrade() {
|
||||
|
||||
Reference in New Issue
Block a user