mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
remove some code
This commit is contained in:
@@ -192,8 +192,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
connection,
|
||||
self.conn_id,
|
||||
node_info.aux.clone(),
|
||||
)
|
||||
.await)
|
||||
))
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
|
||||
|
||||
@@ -140,7 +140,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
|
||||
fn put(
|
||||
pool: &RwLock<Self>,
|
||||
mut node: Pin<&mut Node<ConnTypes<C>>>,
|
||||
node: Pin<&mut Node<ConnTypes<C>>>,
|
||||
db_user: &(DbName, RoleName),
|
||||
client: ClientInner<C>,
|
||||
conn_info: ConnInfo,
|
||||
@@ -168,11 +168,6 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
if pool.total_conns < pool.max_conns {
|
||||
let pool_entries = pool.pools.entry(db_user.clone()).or_default();
|
||||
|
||||
if let Some(node) = node.as_mut().initialized_mut() {
|
||||
if node.take_removed(&pool_entries.conns).is_err() {
|
||||
panic!("client is already in the pool")
|
||||
};
|
||||
}
|
||||
pool_entries.conns.cursor_front_mut().insert_after(
|
||||
node,
|
||||
ConnPoolEntry {
|
||||
@@ -457,7 +452,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
client.session.send(ctx.session_id)?;
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.latency_timer.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
@@ -520,7 +514,7 @@ type ConnTypes<C> = dyn pin_list::Types<
|
||||
Unprotected = (),
|
||||
>;
|
||||
|
||||
pub async fn poll_tokio_client(
|
||||
pub fn poll_tokio_client(
|
||||
global_pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
ctx: &mut RequestMonitoring,
|
||||
conn_info: ConnInfo,
|
||||
@@ -580,7 +574,6 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
) -> Client<C> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol);
|
||||
let session_id = ctx.session_id;
|
||||
let (tx, rx) = tokio::sync::watch::channel(session_id);
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let cold_start_info = ctx.cold_start_info;
|
||||
@@ -609,7 +602,6 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
pool: pool.clone(),
|
||||
|
||||
session_span,
|
||||
session_rx: rx,
|
||||
|
||||
conn_gauge,
|
||||
conn_id,
|
||||
@@ -620,7 +612,6 @@ pub fn poll_client<C: ClientInnerExt, I: Future<Output = ()> + Send + 'static>(
|
||||
|
||||
let inner = ClientInner {
|
||||
inner: client,
|
||||
session: tx,
|
||||
pool: send_client,
|
||||
cancel,
|
||||
aux,
|
||||
@@ -649,7 +640,6 @@ pin_project! {
|
||||
|
||||
// Used for reporting the current session the conn is attached to
|
||||
session_span: tracing::Span,
|
||||
session_rx: tokio::sync::watch::Receiver<uuid::Uuid>,
|
||||
|
||||
// Static connection state
|
||||
conn_gauge: NumDbConnectionsGuard<'static>,
|
||||
@@ -670,14 +660,29 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// if there's no pool, then this client will be closed.
|
||||
let Some(pool) = this.pool.upgrade() else {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
};
|
||||
|
||||
if let Some(init) = this.node.as_mut().initialized_mut() {
|
||||
if let Some(entry) = pool.read().pools.get(this.db_user) {
|
||||
if let Ok((session_id, _)) = init.take_removed(&entry.conns) {
|
||||
*this.session_span = info_span!("", %session_id);
|
||||
let _span = this.session_span.enter();
|
||||
info!("changed session");
|
||||
this.idle_timeout
|
||||
.as_mut()
|
||||
.reset(Instant::now() + *this.idle);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if let Poll::Ready(client) = this.recv_client.poll_recv(cx) {
|
||||
// if the send_client is dropped, then the client is dropped
|
||||
let Some((span, client, conn_info)) = client else {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
};
|
||||
// if there's no pool, then this client will be closed.
|
||||
let Some(pool) = this.pool.upgrade() else {
|
||||
let _span = this.session_span.enter();
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
};
|
||||
@@ -688,24 +693,6 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
}
|
||||
}
|
||||
|
||||
match this.session_rx.has_changed() {
|
||||
Ok(true) => {
|
||||
let session_id = *this.session_rx.borrow_and_update();
|
||||
*this.session_span = info_span!("", %session_id);
|
||||
let _span = this.session_span.enter();
|
||||
info!("changed session");
|
||||
this.idle_timeout
|
||||
.as_mut()
|
||||
.reset(Instant::now() + *this.idle);
|
||||
}
|
||||
Err(_) => {
|
||||
let _span = this.session_span.enter();
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let _span = this.session_span.enter();
|
||||
|
||||
// 5 minute idle connection timeout
|
||||
@@ -729,13 +716,11 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
ready!(this.connection.poll(cx));
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = this.pool.upgrade() {
|
||||
if pool
|
||||
.write()
|
||||
.remove_client(this.db_user.clone(), *this.conn_id)
|
||||
{
|
||||
info!("closed connection removed");
|
||||
}
|
||||
if pool
|
||||
.write()
|
||||
.remove_client(this.db_user.clone(), *this.conn_id)
|
||||
{
|
||||
info!("closed connection removed");
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
@@ -744,7 +729,6 @@ impl<C: ClientInnerExt, I: Future<Output = ()>> Future for DbConnection<C, I> {
|
||||
|
||||
struct ClientInner<C: ClientInnerExt> {
|
||||
inner: C,
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
pool: tokio::sync::mpsc::Sender<(tracing::Span, ClientInner<C>, ConnInfo)>,
|
||||
cancel: CancellationToken,
|
||||
aux: MetricsAuxInfo,
|
||||
|
||||
Reference in New Issue
Block a user