From 05dd1ae9e038589c98168f8e817d8a31e027d12f Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 7 Aug 2024 20:14:45 +0300 Subject: [PATCH] fix: drain completed page_service connections (#8632) We've noticed increased memory usage with the latest release. Drain the joinset of `page_service` connection handlers to avoid leaking them until shutdown. An alternative would be to use a TaskTracker. TaskTracker was not discussed in original PR #8339 review, so not hot fixing it in here either. --- pageserver/src/page_service.rs | 42 +++++++++++----------- test_runner/regress/test_bad_connection.py | 11 +++++- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5344b83e0d..81294291a9 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -122,16 +122,19 @@ impl Listener { } } impl Connections { - pub async fn shutdown(self) { + pub(crate) async fn shutdown(self) { let Self { cancel, mut tasks } = self; cancel.cancel(); while let Some(res) = tasks.join_next().await { - // the logging done here mimics what was formerly done by task_mgr - match res { - Ok(Ok(())) => {} - Ok(Err(e)) => error!("error in page_service connection task: {:?}", e), - Err(e) => error!("page_service connection task panicked: {:?}", e), - } + Self::handle_connection_completion(res); + } + } + + fn handle_connection_completion(res: Result, tokio::task::JoinError>) { + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => error!("error in page_service connection task: {:?}", e), + Err(e) => error!("page_service connection task panicked: {:?}", e), } } } @@ -155,20 +158,19 @@ pub async fn libpq_listener_main( let connections_cancel = CancellationToken::new(); let mut connection_handler_tasks = tokio::task::JoinSet::default(); - // Wait for a new connection to arrive, or for server shutdown. - while let Some(res) = tokio::select! { - biased; + loop { + let accepted = tokio::select! { + biased; + _ = listener_cancel.cancelled() => break, + next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => { + let res = next.expect("we dont poll while empty"); + Connections::handle_connection_completion(res); + continue; + } + accepted = listener.accept() => accepted, + }; - _ = listener_cancel.cancelled() => { - // We were requested to shut down. - None - } - - res = listener.accept() => { - Some(res) - } - } { - match res { + match accepted { Ok((socket, peer_addr)) => { // Connection established. Spawn a new task to handle it. debug!("accepted connection from {}", peer_addr); diff --git a/test_runner/regress/test_bad_connection.py b/test_runner/regress/test_bad_connection.py index 82a3a05c2b..392b73c1f7 100644 --- a/test_runner/regress/test_bad_connection.py +++ b/test_runner/regress/test_bad_connection.py @@ -10,7 +10,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder @pytest.mark.timeout(600) def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() - env.pageserver.allowed_errors.append(".*simulated connection error.*") + env.pageserver.allowed_errors.append(".*simulated connection error.*") # this is never hit + + # the real reason (Simulated Connection Error) is on the next line, and we cannot filter this out. + env.pageserver.allowed_errors.append( + ".*ERROR error in page_service connection task: Postgres query error" + ) # Enable failpoint before starting everything else up so that we exercise the retry # on fetching basebackup @@ -69,3 +74,7 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): cur.fetchall() times_executed += 1 log.info(f"Workload executed {times_executed} times") + + # do a graceful shutdown which would had caught the allowed_errors before + # https://github.com/neondatabase/neon/pull/8632 + env.pageserver.stop()