mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
fix: drain completed page_service connections (#8632)
We've noticed increased memory usage with the latest release. Drain the joinset of `page_service` connection handlers to avoid leaking them until shutdown. An alternative would be to use a TaskTracker. TaskTracker was not discussed in original PR #8339 review, so not hot fixing it in here either.
This commit is contained in:
@@ -122,16 +122,19 @@ impl Listener {
|
||||
}
|
||||
}
|
||||
impl Connections {
|
||||
pub async fn shutdown(self) {
|
||||
pub(crate) async fn shutdown(self) {
|
||||
let Self { cancel, mut tasks } = self;
|
||||
cancel.cancel();
|
||||
while let Some(res) = tasks.join_next().await {
|
||||
// the logging done here mimics what was formerly done by task_mgr
|
||||
match res {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
|
||||
Err(e) => error!("page_service connection task panicked: {:?}", e),
|
||||
}
|
||||
Self::handle_connection_completion(res);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
|
||||
match res {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
|
||||
Err(e) => error!("page_service connection task panicked: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -155,20 +158,19 @@ pub async fn libpq_listener_main(
|
||||
let connections_cancel = CancellationToken::new();
|
||||
let mut connection_handler_tasks = tokio::task::JoinSet::default();
|
||||
|
||||
// Wait for a new connection to arrive, or for server shutdown.
|
||||
while let Some(res) = tokio::select! {
|
||||
biased;
|
||||
loop {
|
||||
let accepted = tokio::select! {
|
||||
biased;
|
||||
_ = listener_cancel.cancelled() => break,
|
||||
next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
|
||||
let res = next.expect("we dont poll while empty");
|
||||
Connections::handle_connection_completion(res);
|
||||
continue;
|
||||
}
|
||||
accepted = listener.accept() => accepted,
|
||||
};
|
||||
|
||||
_ = listener_cancel.cancelled() => {
|
||||
// We were requested to shut down.
|
||||
None
|
||||
}
|
||||
|
||||
res = listener.accept() => {
|
||||
Some(res)
|
||||
}
|
||||
} {
|
||||
match res {
|
||||
match accepted {
|
||||
Ok((socket, peer_addr)) => {
|
||||
// Connection established. Spawn a new task to handle it.
|
||||
debug!("accepted connection from {}", peer_addr);
|
||||
|
||||
@@ -10,7 +10,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
@pytest.mark.timeout(600)
|
||||
def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.append(".*simulated connection error.*")
|
||||
env.pageserver.allowed_errors.append(".*simulated connection error.*") # this is never hit
|
||||
|
||||
# the real reason (Simulated Connection Error) is on the next line, and we cannot filter this out.
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*ERROR error in page_service connection task: Postgres query error"
|
||||
)
|
||||
|
||||
# Enable failpoint before starting everything else up so that we exercise the retry
|
||||
# on fetching basebackup
|
||||
@@ -69,3 +74,7 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
|
||||
cur.fetchall()
|
||||
times_executed += 1
|
||||
log.info(f"Workload executed {times_executed} times")
|
||||
|
||||
# do a graceful shutdown which would had caught the allowed_errors before
|
||||
# https://github.com/neondatabase/neon/pull/8632
|
||||
env.pageserver.stop()
|
||||
|
||||
Reference in New Issue
Block a user