fix and add timeout test

This commit is contained in:
Conrad Ludgate
2025-05-20 17:17:53 +01:00
parent 69d21d85bc
commit 49b6ee6c57

View File

@@ -68,16 +68,6 @@ where
}
if copy1.is_ready() {
info!("Compute is done, terminate client");
// we will never write anymore data to the compute.
client_to_compute.filled = 0..0;
// make sure to shutdown the compute conn.
client_to_compute.need_flush = true;
}
if copy2.is_ready() {
info!("Client is done, terminate compute");
// we will never write anymore data to the client.
@@ -87,6 +77,16 @@ where
compute_to_client.need_flush = true;
}
if copy2.is_ready() {
info!("Compute is done, terminate client");
// we will never write anymore data to the compute.
client_to_compute.filled = 0..0;
// make sure to shutdown the compute conn.
client_to_compute.need_flush = true;
}
Poll::Ready(Ok(()))
})
.await?;
@@ -101,9 +101,11 @@ where
let res1 = client_to_compute.poll_empty(cx, compute.as_mut())?;
let res2 = compute_to_client.poll_empty(cx, client.as_mut())?;
ready!(res1);
ready!(res2);
Poll::Ready(Ok(()))
if res1.is_ready() && res2.is_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
});
// We assume most peers will have enough buffer space so this issue doesn't arise, but we apply
@@ -362,4 +364,31 @@ mod tests {
copy.await.unwrap();
}
#[tokio::test(start_paused = true)]
async fn test_timeout() {
let (mut client, mut client_proxy) = tokio::io::duplex(32); // Create a mock duplex stream
let (mut proxy_compute, mut compute) = tokio::io::duplex(16); // Create a mock duplex stream
// Try to send 24 bytes to compute, but compute only has space for 16 bytes.
// Writes will not succeed.
client.write_all(b"Neon Serverless Postgres").await.unwrap();
client.shutdown().await.unwrap();
let copy = tokio::spawn(async move {
copy_bidirectional_client_compute(&mut client_proxy, &mut proxy_compute, |_, _| {})
.await
.unwrap_err()
});
tokio::time::advance(DISCONNECT_TIMEOUT).await;
let res = copy.await.unwrap();
assert!(matches!(res, ErrorSource::Timeout(_)));
// Assert correct transferred amounts
let mut compute_recv = String::new();
compute.read_to_string(&mut compute_recv).await.unwrap();
assert_eq!(compute_recv, "Neon Serverless ");
}
}