diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 7c8efc1812..bc84e14333 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -17,6 +17,7 @@ rand = "0.8" tokio-util = { version = "0.7", features = ["compat"] } hyper-util = "0.1.9" hyper = "1.6.0" +metrics.workspace = true pageserver_page_api.workspace = true diff --git a/pageserver/client_grpc/src/client_cache.rs b/pageserver/client_grpc/src/client_cache.rs index 1b82103bec..c3046f1c85 100644 --- a/pageserver/client_grpc/src/client_cache.rs +++ b/pageserver/client_grpc/src/client_cache.rs @@ -1,14 +1,37 @@ use std::{ collections::HashMap, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, time::{Duration, Instant}, }; use tokio::{ - sync::{Mutex, Notify, mpsc, watch}, + sync::{Mutex, mpsc, watch, Semaphore, OwnedSemaphorePermit}, time::sleep, + net::TcpStream, + io::{AsyncRead, AsyncWrite, ReadBuf}, }; use tonic::transport::{Channel, Endpoint}; +use uuid; +use std::io::{self, Error, ErrorKind}; + +use std::{ + pin::Pin, + task::{Context, Poll} +}; + +use futures::future; +use rand::{ + Rng, + rngs::StdRng, + SeedableRng +}; + +use tower::service_fn; +use http::Uri; +use hyper_util::rt::TokioIo; use bytes::BytesMut; use futures::future; use http::Uri; @@ -24,6 +47,14 @@ use tokio::net::TcpStream; use tower::service_fn; use uuid; +use metrics; +use metrics::proto::MetricFamily; +use metrics::{Encoder, TextEncoder}; + +// use info +use tracing::info; + +use tokio_util::sync::CancellationToken; /// A pooled gRPC client with capacity tracking and error handling. pub struct ConnectionPool { inner: Mutex, @@ -42,18 +73,10 @@ pub struct ConnectionPool { // The maximum duration a connection can be idle before being removed max_idle_duration: Duration, + channel_semaphore: Arc, - // This notify is signaled when a connection is released or created. - notify: Notify, - - // When it is time to create a new connection for the pool, we signal - // a watch and a connection creation async wakes up and does the work. - cc_watch_tx: watch::Sender, - cc_watch_rx: watch::Receiver, - - // To acquire a connection from the pool, send a request - // to this mpsc, and wait for a response. - request_tx: mpsc::Sender>, + shutdown_token: CancellationToken, + aggregate_metrics: Option>, } struct Inner { @@ -62,6 +85,8 @@ struct Inner { // This is updated when a connection is dropped, or we fail // to create a new connection. last_connect_failure: Option, + waiters: usize, + in_progress: usize, } struct ConnectionEntry { @@ -77,6 +102,7 @@ pub struct PooledClient { pub channel: Channel, pool: Arc, id: uuid::Uuid, + permit: OwnedSemaphorePermit, } /// Wraps a `TcpStream`, buffers incoming data, and injects a random delay per fresh read/write. pub struct TokioTcp { @@ -206,7 +232,6 @@ impl AsyncWrite for TokioTcp { } impl ConnectionPool { - /// Create a new pool and spawn the background task that handles requests. pub fn new( endpoint: &String, max_consumers: usize, @@ -217,107 +242,165 @@ impl ConnectionPool { max_delay_ms: u64, drop_rate: f64, hang_rate: f64, + aggregate_metrics: Option>, ) -> Arc { - let (request_tx, mut request_rx) = mpsc::channel::>(100); - let (watch_tx, watch_rx) = watch::channel(false); + + let shutdown_token = CancellationToken::new(); let pool = Arc::new(Self { inner: Mutex::new(Inner { entries: HashMap::new(), last_connect_failure: None, + waiters: 0, + in_progress: 0, }), - notify: Notify::new(), - cc_watch_tx: watch_tx, - cc_watch_rx: watch_rx, + channel_semaphore: Arc::new(Semaphore::new(0)), endpoint: endpoint.clone(), max_consumers: max_consumers, error_threshold: error_threshold, connect_timeout: connect_timeout, connect_backoff: connect_backoff, max_idle_duration: max_idle_duration, - request_tx: request_tx, max_delay_ms: max_delay_ms, drop_rate: drop_rate, hang_rate: hang_rate, + shutdown_token: shutdown_token.clone(), + aggregate_metrics: aggregate_metrics.clone(), }); - // - // Background task to handle requests and create connections. - // - // TODO: These should be canceled when the ConnectionPool is dropped - // - - let bg_cc_pool = Arc::clone(&pool); - tokio::spawn(async move { - loop { - bg_cc_pool.create_connection().await; - } - }); - - let bg_pool = Arc::clone(&pool); - tokio::spawn(async move { - while let Some(responder) = request_rx.recv().await { - // TODO: This call should time out and return an error - let (id, channel) = bg_pool.acquire_connection().await; - let client = PooledClient { - channel, - pool: Arc::clone(&bg_pool), - id, - }; - let _ = responder.send(client).await; - } - }); - - // Background task to sweep idle connections + // Cancelable background task to sweep idle connections + let sweeper_token = shutdown_token.clone(); let sweeper_pool = Arc::clone(&pool); tokio::spawn(async move { loop { - sweeper_pool.sweep_idle_connections().await; - sleep(Duration::from_secs(5)).await; // Run every 5 seconds + tokio::select! { + _ = sweeper_token.cancelled() => break, + _ = async { + sweeper_pool.sweep_idle_connections().await; + sleep(Duration::from_secs(5)).await; + } => {} + } } }); pool } - // Sweep and remove idle connections - async fn sweep_idle_connections(&self) { - let mut inner = self.inner.lock().await; - let now = Instant::now(); - inner.entries.retain(|_id, entry| { - if entry.active_consumers == 0 - && now.duration_since(entry.last_used) > self.max_idle_duration - { - // Remove idle connection - return false; + pub async fn shutdown(self: Arc) { + self.shutdown_token.cancel(); + + loop { + let all_idle = { + let inner = self.inner.lock().await; + inner.entries.values().all(|e| e.active_consumers == 0) + }; + if all_idle { + break; } - true - }); + sleep(Duration::from_millis(100)).await; + } + + // 4. Remove all entries + let mut inner = self.inner.lock().await; + inner.entries.clear(); } - async fn acquire_connection(&self) -> (uuid::Uuid, Channel) { - loop { - // Reuse an existing healthy connection if available - { - let mut inner = self.inner.lock().await; - // TODO: Use a heap, although the number of connections is small - if let Some((&id, entry)) = inner - .entries - .iter_mut() - .filter(|(_, e)| e.active_consumers < self.max_consumers) - .filter(|(_, e)| e.consecutive_errors < self.error_threshold) - .max_by_key(|(_, e)| e.active_consumers) + /// Sweep and remove idle connections safely, burning their permits. + async fn sweep_idle_connections(self: &Arc) { + let mut to_forget = Vec::new(); + let now = Instant::now(); + + // Remove idle entries and collect permits to forget + { + let mut inner = self.inner.lock().await; + inner.entries.retain(|_, entry| { + if entry.active_consumers == 0 + && now.duration_since(entry.last_used) > self.max_idle_duration { - entry.active_consumers += 1; - return (id, entry.channel.clone()); + let semaphore = Arc::clone(&self.channel_semaphore); + if let Ok(permits) = semaphore.try_acquire_many_owned(self.max_consumers as u32) { + to_forget.push(permits); + return false; // remove this entry + } } - // There is no usable connection, so notify the connection creation async to make one. (It is - // possible that a consumer will release a connection while the new one is being created, in - // which case we will use it right away, but the new connection will be created anyway.) - let _ = self.cc_watch_tx.send(true); + true + }); + } + + // Permanently consume those permits + for permit in to_forget { + permit.forget(); + } + } + + // If we have a permit already, get a connection out of the hash table + async fn get_conn_with_permit(self: Arc, permit: OwnedSemaphorePermit) -> PooledClient { + let mut inner = self.inner.lock().await; + // TODO: Use a heap, although the number of connections is small + if let Some((&id, entry)) = inner + .entries + .iter_mut() + .filter(|(_, e)| e.active_consumers < self.max_consumers) + .filter(|(_, e)| e.consecutive_errors < self.error_threshold) + .max_by_key(|(_, e)| e.active_consumers) + { + entry.active_consumers += 1; + let client = PooledClient { + channel: entry.channel.clone(), + pool: Arc::clone(&self), + id, + permit, + }; + return client; + } else { + panic!("Corrupt state: no available connections with permit acquired."); + } + } + + pub async fn get_client(self: Arc) -> Result { + + if self.shutdown_token.is_cancelled() { + return Err(tonic::Status::unavailable("Pool is shutting down")); + } + + // Try to get the semaphore. If it fails, we are out of connections, so + // request that a new connection be created. + let mut semaphore = Arc::clone(&self.channel_semaphore); + match semaphore.try_acquire_owned() { + Ok(permit_) => { + let pool_conn = self.get_conn_with_permit(permit_).await; + return Ok(pool_conn); + } + Err(_) => { + + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["sema_acquire_failed"]).inc(); + } + None => {} + } + + { + let mut inner = self.inner.lock().await; + inner.waiters += 1; + if inner.waiters > (inner.in_progress * self.max_consumers) { + let self_clone = Arc::clone(&self); + tokio::task::spawn(async move { + self_clone.create_connection().await; + }); + inner.in_progress += 1; + } + } + // Wait for a connection to become available, either because it + // was created or because a connection was returned to the pool. + semaphore = Arc::clone(&self.channel_semaphore); + let conn_permit = semaphore.acquire_owned().await.unwrap(); + { + let mut inner = self.inner.lock().await; + inner.waiters -= 1; + } + let pool_conn = self.get_conn_with_permit(conn_permit).await; + return Ok(pool_conn); } - // Wait for a new connection, or for one of the consumers to release a connection - // TODO: Put this notify in a timeout - self.notify.notified().await; } } @@ -329,7 +412,6 @@ impl ConnectionPool { // This is a custom connector that inserts delays and errors, for // testing purposes. It would normally be disabled by the config. let connector = service_fn(move |uri: Uri| { - let max_delay = max_delay_ms; let drop_rate = drop_rate; let hang_rate = hang_rate; async move { @@ -340,11 +422,6 @@ impl ConnectionPool { return future::pending::, std::io::Error>>().await; } - if max_delay > 0 { - // Random delay before connecting - let delay = rng.gen_range(0..max_delay); - tokio::time::sleep(Duration::from_millis(delay)).await; - } // Random drop (connect error) if drop_rate > 0.0 && rng.gen_bool(drop_rate) { return Err(std::io::Error::new( @@ -363,30 +440,23 @@ impl ConnectionPool { _ => return Err(Error::new(ErrorKind::InvalidInput, "no host or port")), }; - //let addr = uri.authority().unwrap().as_str(); let tcp = TcpStream::connect(addr).await?; let tcpwrapper = TokioTcp::new(tcp, max_delay_ms); Ok(TokioIo::new(tcpwrapper)) } }); - // Wait to be signalled to create a connection. - let mut recv = self.cc_watch_tx.subscribe(); - if !*self.cc_watch_rx.borrow() { - while recv.changed().await.is_ok() { - if *self.cc_watch_rx.borrow() { - break; - } - } - } - + let mut backoff_delay = self.connect_backoff; loop { + // Back off. + // Loop because failure can occur while we are sleeping, so wait + // until the failure stopped for at least one backoff period. loop { if let Some(delay) = { let inner = self.inner.lock().await; inner.last_connect_failure.and_then(|at| { - (at.elapsed() < self.connect_backoff) - .then(|| self.connect_backoff - at.elapsed()) + (at.elapsed() < backoff_delay) + .then(|| backoff_delay - at.elapsed()) }) } { sleep(delay).await; @@ -402,6 +472,14 @@ impl ConnectionPool { // on this connection. (Requests made later on this channel will time out // with the same timeout.) // + + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["connection_attempt"]).inc(); + } + None => {} + } + let attempt = tokio::time::timeout( self.connect_timeout, Endpoint::from_shared(self.endpoint.clone()) @@ -414,6 +492,12 @@ impl ConnectionPool { match attempt { Ok(Ok(channel)) => { { + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["connection_success"]).inc(); + } + None => {} + } let mut inner = self.inner.lock().await; let id = uuid::Uuid::new_v4(); inner.entries.insert( @@ -426,31 +510,34 @@ impl ConnectionPool { last_used: Instant::now(), }, ); - self.notify.notify_one(); - let _ = self.cc_watch_tx.send(false); + self.channel_semaphore.add_permits(self.max_consumers); + // decrement in progress connections + inner.in_progress -= 1; return; }; } Ok(Err(_)) | Err(_) => { + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.retry_counters.with_label_values(&["connect_failed"]).inc(); + } + None => {} + } let mut inner = self.inner.lock().await; inner.last_connect_failure = Some(Instant::now()); + // Add some jitter so that every connection doesn't retry at once + let jitter = rand::thread_rng().gen_range(0..=backoff_delay.as_millis() as u64); + backoff_delay = Duration::from_millis(backoff_delay.as_millis() as u64 + jitter); + + // Do not delay longer than one minute + if backoff_delay > Duration::from_secs(60) { + backoff_delay = Duration::from_secs(60); + } } } } } - /// Get a client we can use to send gRPC messages. - pub async fn get_client(&self) -> PooledClient { - let (resp_tx, mut resp_rx) = mpsc::channel(1); - self.request_tx - .send(resp_tx) - .await - .expect("ConnectionPool task has shut down"); - resp_rx - .recv() - .await - .expect("ConnectionPool task has shut down") - } /// Return client to the pool, indicating success or error. pub async fn return_client(&self, id: uuid::Uuid, success: bool) { @@ -458,7 +545,6 @@ impl ConnectionPool { let mut new_failure = false; if let Some(entry) = inner.entries.get_mut(&id) { entry.last_used = Instant::now(); - // TODO: This should be a debug_assert if entry.active_consumers <= 0 { panic!("A consumer completed when active_consumers was zero!") } @@ -488,10 +574,9 @@ impl ConnectionPool { if remove == 0 { inner.entries.remove(&id); } - } else { - self.notify.notify_one(); } } + // The semaphore permit is released when the pooled client is dropped. } } diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 994135179d..2a15e59786 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -22,6 +22,8 @@ use utils::shard::ShardIndex; use std::fmt::Debug; mod client_cache; +use metrics::{IntCounter, IntCounterVec, core::Collector}; + #[derive(Error, Debug)] pub enum PageserverClientError { #[error("could not connect to service: {0}")] @@ -38,6 +40,42 @@ pub enum PageserverClientError { Other(String), } +#[derive(Clone, Debug)] +pub struct PageserverClientAggregateMetrics { + pub request_counters: IntCounterVec, + pub retry_counters: IntCounterVec, +} +impl PageserverClientAggregateMetrics { + pub fn new() -> Self { + + let request_counters = IntCounterVec::new( + metrics::core::Opts::new( + "backend_requests_total", + "Number of requests from backends.", + ), + &["request_kind"], + ).unwrap(); + + let retry_counters = IntCounterVec::new( + metrics::core::Opts::new( + "backend_requests_retries_total", + "Number of retried requests from backends.", + ), + &["request_kind"], + ).unwrap(); + Self { + request_counters, + retry_counters, + } + } + + pub fn collect (&self) -> Vec { + let mut metrics = Vec::new(); + metrics.append(&mut self.request_counters.collect()); + metrics.append(&mut self.retry_counters.collect()); + metrics + } +} pub struct PageserverClient { _tenant_id: String, _timeline_id: String, @@ -51,6 +89,8 @@ pub struct PageserverClient { auth_interceptor: AuthInterceptor, client_cache_options: ClientCacheOptions, + + aggregate_metrics: Option>, } pub struct ClientCacheOptions { @@ -82,7 +122,7 @@ impl PageserverClient { drop_rate: 0.0, hang_rate: 0.0, }; - Self::new_with_config(tenant_id, timeline_id, auth_token, shard_map, options) + Self::new_with_config(tenant_id, timeline_id, auth_token, shard_map, options, None) } pub fn new_with_config( tenant_id: &str, @@ -90,7 +130,9 @@ impl PageserverClient { auth_token: &Option, shard_map: HashMap, options: ClientCacheOptions, + metrics: Option>, ) -> Self { + Self { _tenant_id: tenant_id.to_string(), _timeline_id: timeline_id.to_string(), @@ -99,6 +141,7 @@ impl PageserverClient { channels: RwLock::new(HashMap::new()), auth_interceptor: AuthInterceptor::new(tenant_id, timeline_id, auth_token.as_deref()), client_cache_options: options, + aggregate_metrics: metrics, } } pub async fn process_check_rel_exists_request( @@ -185,6 +228,13 @@ impl PageserverClient { )); }; + match self.aggregate_metrics { + Some(ref metrics) => { + metrics.request_counters.with_label_values(&["get_page"]).inc(); + } + None => {} + } + match response { Err(status) => { pooled_client.finish(Err(status.clone())).await; // Pass error to finish @@ -305,7 +355,7 @@ impl PageserverClient { let usable_pool: Arc; match reused_pool { Some(pool) => { - let pooled_client = pool.get_client().await; + let pooled_client = pool.get_client().await.unwrap(); return pooled_client; } None => { @@ -323,6 +373,7 @@ impl PageserverClient { self.client_cache_options.max_delay_ms, self.client_cache_options.drop_rate, self.client_cache_options.hang_rate, + self.aggregate_metrics.clone(), ); let mut write_pool = self.channels.write().unwrap(); write_pool.insert(shard, new_pool.clone()); @@ -330,7 +381,7 @@ impl PageserverClient { } } - let pooled_client = usable_pool.get_client().await; + let pooled_client = usable_pool.get_client().await.unwrap(); return pooled_client; } } diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 629248f062..4469e1b755 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -22,6 +22,10 @@ tracing.workspace = true tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true +axum.workspace = true +http.workspace = true + +metrics.workspace = true pageserver_client.workspace = true pageserver_client_grpc.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 6d4175867b..071c952e0d 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -23,6 +23,19 @@ use utils::id::TenantTimelineId; use utils::lsn::Lsn; use utils::shard::ShardIndex; +use axum::Router; +use axum::body::Body; +use axum::extract::State; +use axum::response::Response; + + +use http::StatusCode; +use http::header::CONTENT_TYPE; + +use metrics; +use metrics::proto::MetricFamily; +use metrics::{Encoder, TextEncoder}; + use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; @@ -157,6 +170,36 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { main_impl(args, thread_local_stats) }) } +async fn get_metrics(State(state): State>) -> Response { + use metrics::core::Collector; + let metrics = state.collect(); + + info!("metrics: {metrics:?}"); + // When we call TextEncoder::encode() below, it will immediately return an + // error if a metric family has no metrics, so we need to preemptively + // filter out metric families with no metrics. + let metrics = metrics + .into_iter() + .filter(|m| !m.get_metric().is_empty()) + .collect::>(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + + if let Err(e) = encoder.encode(&metrics, &mut buffer) { + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(e.to_string())) + .unwrap() + } else { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } +} async fn main_impl( args: Args, @@ -164,6 +207,24 @@ async fn main_impl( ) -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(args)); + // Vector of pageserver clients + let client_metrics = Arc::new(pageserver_client_grpc::PageserverClientAggregateMetrics::new()); + + use axum::routing::get; + let app = Router::new() + .route("/metrics", get(get_metrics)) + .with_state(client_metrics.clone()); + + // TODO: make configurable. Or listen on unix domain socket? + let listener = tokio::net::TcpListener::bind("127.0.0.1:9090") + .await + .unwrap(); + + tokio::spawn(async { + tracing::info!("metrics listener spawned"); + axum::serve(listener, app).await.unwrap() + }); + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( reqwest::Client::new(), // TODO: support ssl_ca_file for https APIs in pagebench. args.mgmt_api_endpoint.clone(), @@ -322,6 +383,8 @@ async fn main_impl( let rps_period = args .per_client_rate .map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64))); + let new_metrics = client_metrics.clone(); + let make_worker: &dyn Fn(WorkerId) -> Pin>> = &|worker_id| { let ss = shared_state.clone(); let cancel = cancel.clone(); @@ -334,11 +397,12 @@ async fn main_impl( rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len())) .unwrap(); + let new_value = new_metrics.clone(); Box::pin(async move { if args.grpc_stream { client_grpc_stream(args, worker_id, ss, cancel, rps_period, ranges, weights).await } else if args.grpc { - client_grpc(args, worker_id, ss, cancel, rps_period, ranges, weights).await + client_grpc(args, worker_id, new_value, ss, cancel, rps_period, ranges, weights).await } else { client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await } @@ -485,6 +549,7 @@ async fn client_libpq( async fn client_grpc( args: &Args, worker_id: WorkerId, + client_metrics: Arc, shared_state: Arc, cancel: CancellationToken, rps_period: Option, @@ -511,9 +576,13 @@ async fn client_grpc( &None, shard_map, options, + Some(client_metrics.clone()), ); + let client = Arc::new(client); + + shared_state.start_work_barrier.wait().await; let client_start = Instant::now(); let mut ticks_processed = 0;