From 49b6ee6c5714eec83f1c8f1b74cb867881209b88 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 20 May 2025 17:17:53 +0100 Subject: [PATCH] fix and add timeout test --- proxy/src/proxy/copy_bidirectional.rs | 55 ++++++++++++++++++++------- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 78057fd52f..227628ca75 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -68,16 +68,6 @@ where } if copy1.is_ready() { - info!("Compute is done, terminate client"); - - // we will never write anymore data to the compute. - client_to_compute.filled = 0..0; - - // make sure to shutdown the compute conn. - client_to_compute.need_flush = true; - } - - if copy2.is_ready() { info!("Client is done, terminate compute"); // we will never write anymore data to the client. @@ -87,6 +77,16 @@ where compute_to_client.need_flush = true; } + if copy2.is_ready() { + info!("Compute is done, terminate client"); + + // we will never write anymore data to the compute. + client_to_compute.filled = 0..0; + + // make sure to shutdown the compute conn. + client_to_compute.need_flush = true; + } + Poll::Ready(Ok(())) }) .await?; @@ -101,9 +101,11 @@ where let res1 = client_to_compute.poll_empty(cx, compute.as_mut())?; let res2 = compute_to_client.poll_empty(cx, client.as_mut())?; - ready!(res1); - ready!(res2); - Poll::Ready(Ok(())) + if res1.is_ready() && res2.is_ready() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } }); // We assume most peers will have enough buffer space so this issue doesn't arise, but we apply @@ -362,4 +364,31 @@ mod tests { copy.await.unwrap(); } + + #[tokio::test(start_paused = true)] + async fn test_timeout() { + let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream + let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream + + // Try to send 24 bytes to compute, but compute only has space for 16 bytes. + // Writes will not succeed. + client.write_all(b"Neon Serverless Postgres").await.unwrap(); + client.shutdown().await.unwrap(); + + let copy = tokio::spawn(async move { + copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {}) + .await + .unwrap_err() + }); + + tokio::time::advance(DISCONNECT_TIMEOUT).await; + + let res = copy.await.unwrap(); + assert!(matches!(res, ErrorSource::Timeout(_))); + + // Assert correct transferred amounts + let mut compute_recv = String::new(); + compute.read_to_string(&mut compute_recv).await.unwrap(); + assert_eq!(compute_recv, "Neon Serverless "); + } }