diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 4c619a0bf3..7c8efc1812 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -10,8 +10,14 @@ http.workspace = true thiserror.workspace = true tonic.workspace = true tracing.workspace = true -tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] } +tokio = { version = "1.43.1", features = ["full", "macros", "net", "io-util", "rt", "rt-multi-thread"] } uuid = { version = "1", features = ["v4"] } +tower = { version = "0.4", features = ["timeout"] } +rand = "0.8" +tokio-util = { version = "0.7", features = ["compat"] } +hyper-util = "0.1.9" +hyper = "1.6.0" + pageserver_page_api.workspace = true utils.workspace = true diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index 739814ab26..d4e82f98dc 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -10,6 +10,16 @@ use tokio::{ use tonic::transport::{Channel, Endpoint}; use uuid; +use std::io::{self, Error, ErrorKind}; +use std::{pin::Pin, task::{Context, Poll}}; +use futures::future; +use rand::{Rng, rngs::StdRng, SeedableRng}; +use tower::service_fn; +use http::Uri; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpStream; +use bytes::BytesMut; /// A pooled gRPC client with capacity tracking and error handling. pub struct ConnectionPool { @@ -21,6 +31,12 @@ pub struct ConnectionPool { error_threshold: usize, connect_timeout: Duration, connect_backoff: Duration, + // add max_delay + // The maximum time a connection can be idle before being removed + max_delay_ms: u64, + drop_rate: f64, + hang_rate: f64, + // The maximum duration a connection can be idle before being removed max_idle_duration: Duration, @@ -59,6 +75,139 @@ pub struct PooledClient { pool: Arc, id: uuid::Uuid, } +/// Wraps a `TcpStream`, buffers incoming data, and injects a random delay per fresh read/write. +pub struct TokioTcp { + tcp: TcpStream, + /// Maximum randomized delay in milliseconds + delay_ms: u64, + + /// Next deadline instant for delay + deadline: Instant, + /// Internal buffer of previously-read data + buffer: BytesMut, +} + +impl TokioTcp { + /// Create a new wrapper with given max delay (ms) + pub fn new(stream: TcpStream, delay_ms: u64) -> Self { + let initial = if delay_ms > 0 { + rand::thread_rng().gen_range(0..delay_ms) + } else { + 0 + }; + let deadline = Instant::now() + Duration::from_millis(initial); + TokioTcp { + tcp: stream, + delay_ms, + deadline, + buffer: BytesMut::new(), + } + } +} + +impl AsyncRead for TokioTcp { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // Safe because TokioTcp is Unpin + let this = self.get_mut(); + + // 1) Drain any buffered data + if !this.buffer.is_empty() { + let to_copy = this.buffer.len().min(buf.remaining()); + buf.put_slice(&this.buffer.split_to(to_copy)); + return Poll::Ready(Ok(())); + } + + // 2) If we're still before the deadline, schedule a wake and return Pending + let now = Instant::now(); + if this.delay_ms > 0 && now < this.deadline { + let waker = cx.waker().clone(); + let wait = this.deadline - now; + tokio::spawn(async move { + sleep(wait).await; + waker.wake_by_ref(); + }); + return Poll::Pending; + } + + // 3) Past deadline: compute next random deadline + if this.delay_ms > 0 { + let next_ms = rand::thread_rng().gen_range(0..=this.delay_ms); + this.deadline = Instant::now() + Duration::from_millis(next_ms); + } + + + // 4) Perform actual read into a temporary buffer + let mut tmp = [0u8; 4096]; + let mut rb = ReadBuf::new(&mut tmp); + match Pin::new(&mut this.tcp).poll_read(cx, &mut rb) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => { + let filled = rb.filled(); + if filled.is_empty() { + // EOF or zero bytes + Poll::Ready(Ok(())) + } else { + this.buffer.extend_from_slice(filled); + let to_copy = this.buffer.len().min(buf.remaining()); + buf.put_slice(&this.buffer.split_to(to_copy)); + Poll::Ready(Ok(())) + } + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } + } +} + +impl AsyncWrite for TokioTcp { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + data: &[u8], + ) -> Poll> { + let this = self.get_mut(); + + // 1) If before deadline, schedule wake and return Pending + let now = Instant::now(); + if this.delay_ms > 0 && now < this.deadline { + let waker = cx.waker().clone(); + let wait = this.deadline - now; + tokio::spawn(async move { + sleep(wait).await; + waker.wake_by_ref(); + }); + return Poll::Pending; + } + + // 2) Past deadline: compute next random deadline + if this.delay_ms > 0 { + let next_ms = rand::thread_rng().gen_range(0..=this.delay_ms); + this.deadline = Instant::now() + Duration::from_millis(next_ms); + } + + // 3) Actual write + Pin::new(&mut this.tcp).poll_write(cx, data) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.tcp).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.tcp).poll_shutdown(cx) + } +} impl ConnectionPool { /// Create a new pool and spawn the background task that handles requests. @@ -69,6 +218,9 @@ impl ConnectionPool { connect_timeout: Duration, connect_backoff: Duration, max_idle_duration: Duration, + max_delay_ms: u64, + drop_rate: f64, + hang_rate: f64, ) -> Arc { let (request_tx, mut request_rx) = mpsc::channel::>(100); let (watch_tx, watch_rx) = watch::channel(false); @@ -87,6 +239,9 @@ impl ConnectionPool { connect_backoff: connect_backoff, max_idle_duration: max_idle_duration, request_tx: request_tx, + max_delay_ms: max_delay_ms, + drop_rate: drop_rate, + hang_rate: hang_rate, }); // @@ -170,6 +325,58 @@ impl ConnectionPool { } async fn create_connection(&self) -> () { + + let max_delay_ms = self.max_delay_ms; + let drop_rate = self.drop_rate; + let hang_rate = self.hang_rate; + + // This is a custom connector that inserts delays and errors, for + // testing purposes. It would normally be disabled by the config. + let connector = service_fn(move |uri: Uri| { + let max_delay = max_delay_ms; + let drop_rate = drop_rate; + let hang_rate = hang_rate; + async move { + let mut rng = StdRng::from_entropy(); + // Simulate an indefinite hang + if hang_rate > 0.0 && rng.gen_bool(hang_rate) { + // never completes, to test timeout + return future::pending::, std::io::Error>>().await; + } + + if max_delay > 0 { + // Random delay before connecting + let delay = rng.gen_range(0..max_delay); + tokio::time::sleep(Duration::from_millis(delay)).await; + } + // Random drop (connect error) + if drop_rate > 0.0 && rng.gen_bool(drop_rate) { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "simulated connect drop", + )); + } + + // Otherwise perform real TCP connect + let addr = match (uri.host(), uri.port()) { + // host + explicit port + (Some(host), Some(port)) => format!("{}:{}", host, port.as_str()), + // host only (no port) + (Some(host), None) => host.to_string(), + // neither? error out + _ => return Err(Error::new(ErrorKind::InvalidInput, "no host or port")), + }; + + //let addr = uri.authority().unwrap().as_str(); + let tcp = TcpStream::connect(addr).await?; + let tcpwrapper = TokioTcp::new( + tcp, + max_delay_ms, + ); + Ok(TokioIo::new(tcpwrapper)) + } + }); + // Wait to be signalled to create a connection. let mut recv = self.cc_watch_tx.subscribe(); if !*self.cc_watch_rx.borrow() { @@ -207,13 +414,15 @@ impl ConnectionPool { Endpoint::from_shared(self.endpoint.clone()) .expect("invalid endpoint") .timeout(self.connect_timeout) - .connect(), + .connect_with_connector(connector) ) .await; + match attempt { Ok(Ok(channel)) => { { + let mut inner = self.inner.lock().await; let id = uuid::Uuid::new_v4(); inner.entries.insert( diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 72df0818c8..666073e1bc 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -19,7 +19,10 @@ use pageserver_page_api::proto; use pageserver_page_api::proto::PageServiceClient; use utils::shard::ShardIndex; +use std::{fmt::Debug}; mod client_cache; +//include info +use tracing::info; #[derive(Error, Debug)] pub enum PageserverClientError { @@ -55,6 +58,9 @@ pub struct ClientCacheOptions { pub connect_timeout: Duration, pub connect_backoff: Duration, pub max_idle_duration: Duration, + pub max_delay_ms: u64, + pub drop_rate: f64, + pub hang_rate: f64, } impl PageserverClient { @@ -71,6 +77,9 @@ impl PageserverClient { connect_timeout: Duration::from_secs(5), connect_backoff: Duration::from_secs(1), max_idle_duration: Duration::from_secs(60), + max_delay_ms: 0, + drop_rate: 0.0, + hang_rate: 0.0, }; Self::new_with_config( tenant_id, @@ -165,6 +174,7 @@ impl PageserverClient { match response { Err(status) => { + info!("get_page error: {:?}", status); pooled_client.finish(Err(status.clone())).await; // Pass error to finish return Err(PageserverClientError::RequestError(status)); } @@ -297,6 +307,9 @@ impl PageserverClient { self.client_cache_options.connect_timeout, self.client_cache_options.connect_backoff, self.client_cache_options.max_idle_duration, + self.client_cache_options.max_delay_ms, + self.client_cache_options.drop_rate, + self.client_cache_options.hang_rate, ); let mut write_pool = self.channels.write().unwrap(); write_pool.insert(shard, new_pool.clone()); diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 3888b095f6..efb47d0425 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -88,6 +88,15 @@ pub(crate) struct Args { #[clap(long, default_value = "60000")] pool_max_idle_duration: NonZeroUsize, + #[clap(long, default_value = "0")] + max_delay_ms: usize, + + #[clap(long, default_value = "0")] + percent_drops: usize, + + #[clap(long, default_value = "0")] + percent_hangs: usize, + targets: Option>, } @@ -485,6 +494,9 @@ async fn client_grpc( connect_timeout: Duration::from_millis(args.pool_connect_timeout.get() as u64), connect_backoff: Duration::from_millis(args.pool_connect_backoff.get() as u64), max_idle_duration: Duration::from_millis(args.pool_max_idle_duration.get() as u64), + max_delay_ms: args.max_delay_ms as u64, + drop_rate: (args.percent_drops as f64)/100.0, + hang_rate: (args.percent_hangs as f64)/100.0, }; let client = pageserver_client_grpc::PageserverClient::new_with_config( &worker_id.timeline.tenant_id.to_string(), @@ -553,7 +565,7 @@ async fn client_grpc( } let (start, result) = inflight.next().await.unwrap(); - result.expect("getpage request should succeed"); + result.expect("getpage request should succeed"); let end = Instant::now(); shared_state.live_stats.request_done(); ticks_processed += 1;