diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index cccc64685b..0bf851a8d7 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -153,7 +153,7 @@ impl FlushOp { #[derive(Clone, Debug)] pub struct DeletionQueueClient { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, lsn_table: Arc>, @@ -416,7 +416,7 @@ pub enum DeletionQueueError { impl DeletionQueueClient { pub(crate) fn broken() -> Self { // Channels whose receivers are immediately dropped. - let (tx, _rx) = tokio::sync::mpsc::channel(1); + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1); Self { tx, @@ -428,12 +428,12 @@ impl DeletionQueueClient { /// This is cancel-safe. If you drop the future before it completes, the message /// is not pushed, although in the context of the deletion queue it doesn't matter: once /// we decide to do a deletion the decision is always final. - async fn do_push( + fn do_push( &self, - queue: &tokio::sync::mpsc::Sender, + queue: &tokio::sync::mpsc::UnboundedSender, msg: T, ) -> Result<(), DeletionQueueError> { - match queue.send(msg).await { + match queue.send(msg) { Ok(_) => Ok(()), Err(e) => { // This shouldn't happen, we should shut down all tenants before @@ -445,7 +445,7 @@ impl DeletionQueueClient { } } - pub(crate) async fn recover( + pub(crate) fn recover( &self, attached_tenants: HashMap, ) -> Result<(), DeletionQueueError> { @@ -453,7 +453,6 @@ impl DeletionQueueClient { &self.tx, ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }), ) - .await } /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside @@ -526,6 +525,21 @@ impl DeletionQueueClient { return self.flush_immediate().await; } + self.push_layers_sync(tenant_id, timeline_id, current_generation, layers) + } + + /// When a Tenant has a generation, push_layers is always synchronous because + /// the ListValidator channel is an unbounded channel. + /// + /// This can be merged into push_layers when we remove the Generation-less mode + /// support (``) + pub(crate) fn push_layers_sync( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + current_generation: Generation, + layers: Vec<(LayerFileName, Generation)>, + ) -> Result<(), DeletionQueueError> { metrics::DELETION_QUEUE .keys_submitted .inc_by(layers.len() as u64); @@ -539,17 +553,16 @@ impl DeletionQueueClient { objects: Vec::new(), }), ) - .await } /// This is cancel-safe. If you drop the future the flush may still happen in the background. async fn do_flush( &self, - queue: &tokio::sync::mpsc::Sender, + queue: &tokio::sync::mpsc::UnboundedSender, msg: T, rx: tokio::sync::oneshot::Receiver<()>, ) -> Result<(), DeletionQueueError> { - self.do_push(queue, msg).await?; + self.do_push(queue, msg)?; if rx.await.is_err() { // This shouldn't happen if tenants are shut down before deletion queue. If we // encounter a bug like this, then a flusher will incorrectly believe it has flushed @@ -570,6 +583,18 @@ impl DeletionQueueClient { .await } + /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where + /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant + /// detach where flushing is nice but not necessary. + /// + /// This function provides no guarantees of work being done. + pub fn flush_advisory(&self) { + let (flush_op, _) = FlushOp::new(); + + // Transmit the flush message, ignoring any result (such as a closed channel during shutdown). + drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op))); + } + // Wait until all previous deletions are executed pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> { debug!("flush_execute: flushing to deletion lists..."); @@ -586,9 +611,7 @@ impl DeletionQueueClient { // Flush any immediate-mode deletions (the above backend flush will only flush // the executor if deletions had flowed through the backend) debug!("flush_execute: flushing execution..."); - let (flush_op, rx) = FlushOp::new(); - self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx) - .await?; + self.flush_immediate().await?; debug!("flush_execute: finished flushing execution..."); Ok(()) } @@ -643,8 +666,10 @@ impl DeletionQueue { where C: ControlPlaneGenerationsApi + Send + Sync, { - // Deep channel: it consumes deletions from all timelines and we do not want to block them - let (tx, rx) = tokio::sync::mpsc::channel(16384); + // Unbounded channel: enables non-async functions to submit deletions. The actual length is + // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent + // enough to avoid this taking pathologically large amount of memory. + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16); @@ -957,7 +982,7 @@ mod test { // Basic test that the deletion queue processes the deletions we pass into it let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let tenant_id = ctx.harness.tenant_id; @@ -1025,7 +1050,7 @@ mod test { async fn deletion_queue_validation() -> anyhow::Result<()> { let ctx = setup("deletion_queue_validation").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; // Generation that the control plane thinks is current let latest_generation = Generation::new(0xdeadbeef); @@ -1082,7 +1107,7 @@ mod test { // Basic test that the deletion queue processes the deletions we pass into it let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; let tenant_id = ctx.harness.tenant_id; @@ -1145,9 +1170,7 @@ mod test { drop(client); ctx.restart().await; let client = ctx.deletion_queue.new_client(); - client - .recover(HashMap::from([(tenant_id, now_generation)])) - .await?; + client.recover(HashMap::from([(tenant_id, now_generation)]))?; info!("Flush-executing"); client.flush_execute().await?; @@ -1173,7 +1196,7 @@ pub(crate) mod mock { }; pub struct ConsumerState { - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, executor_rx: tokio::sync::mpsc::Receiver, } @@ -1250,7 +1273,7 @@ pub(crate) mod mock { } pub struct MockDeletionQueue { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, executed: Arc, remote_storage: Option, @@ -1260,7 +1283,7 @@ pub(crate) mod mock { impl MockDeletionQueue { pub fn new(remote_storage: Option) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(16384); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384); let executed = Arc::new(AtomicUsize::new(0)); diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index e846340373..21b8c356cd 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -85,7 +85,7 @@ pub(super) struct ListWriter { conf: &'static PageServerConf, // Incoming frontend requests to delete some keys - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, // Outbound requests to the backend to execute deletion lists we have composed. tx: tokio::sync::mpsc::Sender, @@ -111,7 +111,7 @@ impl ListWriter { pub(super) fn new( conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, tx: tokio::sync::mpsc::Sender, cancel: CancellationToken, ) -> Self { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e0529aeafa..c42bd956f3 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -77,7 +77,7 @@ impl State { disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, ) -> anyhow::Result { - let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] + let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"] .iter() .map(|v| v.parse().unwrap()) .collect::>(); @@ -575,9 +575,14 @@ async fn tenant_detach_handler( let state = get_state(&request); let conf = state.conf; - mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false)) - .instrument(info_span!("tenant_detach", %tenant_id)) - .await?; + mgr::detach_tenant( + conf, + tenant_id, + detach_ignored.unwrap_or(false), + &state.deletion_queue_client, + ) + .instrument(info_span!("tenant_detach", %tenant_id)) + .await?; json_response(StatusCode::OK, ()) } @@ -1034,7 +1039,7 @@ async fn put_tenant_location_config_handler( // The `Detached` state is special, it doesn't upsert a tenant, it removes // its local disk content and drops it from memory. if let LocationConfigMode::Detached = request_data.config.mode { - mgr::detach_tenant(conf, tenant_id, true) + mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client) .instrument(info_span!("tenant_detach", %tenant_id)) .await?; return json_response(StatusCode::OK, ()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 57c1b5f070..264f8a1ee0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; use self::config::AttachedLocationConfig; +use self::config::AttachmentMode; use self::config::LocationConf; use self::config::TenantConf; use self::delete::DeleteTenantFlow; @@ -2076,6 +2077,15 @@ impl Tenant { } } } + + pub(crate) fn get_attach_mode(&self) -> AttachmentMode { + self.tenant_conf + .read() + .unwrap() + .location + .attach_mode + .clone() + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a92fbccdea..35b3be6d61 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -24,7 +24,7 @@ use crate::control_plane_client::{ }; use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; -use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt}; +use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::{ create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState, @@ -206,8 +206,7 @@ async fn init_load_generations( if resources.remote_storage.is_some() { resources .deletion_queue_client - .recover(generations.clone()) - .await?; + .recover(generations.clone())?; } Ok(Some(generations)) @@ -695,6 +694,18 @@ pub(crate) async fn upsert_location( if let Some(tenant) = shutdown_tenant { let (_guard, progress) = utils::completion::channel(); + + match tenant.get_attach_mode() { + AttachmentMode::Single | AttachmentMode::Multi => { + // Before we leave our state as the presumed holder of the latest generation, + // flush any outstanding deletions to reduce the risk of leaking objects. + deletion_queue_client.flush_advisory() + } + AttachmentMode::Stale => { + // If we're stale there's not point trying to flush deletions + } + }; + info!("Shutting down attached tenant"); match tenant.shutdown(progress, false).await { Ok(()) => {} @@ -849,8 +860,16 @@ pub async fn detach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, detach_ignored: bool, + deletion_queue_client: &DeletionQueueClient, ) -> Result<(), TenantStateError> { - let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?; + let tmp_path = detach_tenant0( + conf, + &TENANTS, + tenant_id, + detach_ignored, + deletion_queue_client, + ) + .await?; // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory. // After a tenant is detached, there are no more task_mgr tasks for that tenant_id. let task_tenant_id = None; @@ -875,6 +894,7 @@ async fn detach_tenant0( tenants: &tokio::sync::RwLock, tenant_id: TenantId, detach_ignored: bool, + deletion_queue_client: &DeletionQueueClient, ) -> Result { let tenant_dir_rename_operation = |tenant_id_to_clean| async move { let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean); @@ -886,6 +906,10 @@ async fn detach_tenant0( let removal_result = remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await; + // Flush pending deletions, so that they have a good chance of passing validation + // before this tenant is potentially re-attached elsewhere. + deletion_queue_client.flush_advisory(); + // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) { diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 7d587ff1ec..8742534f5d 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -89,7 +89,10 @@ pub mod errors { Self::Console { status: http::StatusCode::LOCKED, ref text, - } => !text.contains("quota"), + } => { + !text.contains("written data quota exceeded") + && !text.contains("the limit for current plan reached") + } // retry server errors Self::Console { status, .. } if status.is_server_error() => true, _ => false, diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index a7ef15d342..7258f1a2cf 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -20,6 +20,7 @@ use tokio_postgres::AsyncMessage; use crate::{ auth, console, metrics::{Ids, MetricCounter, USAGE_METRICS}, + proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER}, }; use crate::{compute, config}; @@ -418,36 +419,42 @@ async fn connect_to_compute_once( }; tokio::spawn( - poll_fn(move |cx| { - if matches!(rx.has_changed(), Ok(true)) { - session = *rx.borrow_and_update(); - info!(%session, "changed session"); + async move { + NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc(); + scopeguard::defer! { + NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); } + poll_fn(move |cx| { + if matches!(rx.has_changed(), Ok(true)) { + session = *rx.borrow_and_update(); + info!(%session, "changed session"); + } - loop { - let message = ready!(connection.poll_message(cx)); + loop { + let message = ready!(connection.poll_message(cx)); - match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session, "notice: {}", notice); - } - Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); - } - Some(Ok(_)) => { - warn!(%session, "unknown message"); - } - Some(Err(e)) => { - error!(%session, "connection error: {}", e); - return Poll::Ready(()) - } - None => { - info!("connection closed"); - return Poll::Ready(()) + match message { + Some(Ok(AsyncMessage::Notice(notice))) => { + info!(%session, "notice: {}", notice); + } + Some(Ok(AsyncMessage::Notification(notif))) => { + warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); + } + Some(Ok(_)) => { + warn!(%session, "unknown message"); + } + Some(Err(e)) => { + error!(%session, "connection error: {}", e); + return Poll::Ready(()) + } + None => { + info!("connection closed"); + return Poll::Ready(()) + } } } - } - }) + }).await + } .instrument(span) ); diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 380e36d530..fbf19bcb50 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -24,6 +24,8 @@ use url::Url; use utils::http::error::ApiError; use utils::http::json::json_response; +use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER}; + use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; @@ -47,6 +49,7 @@ enum Payload { const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB +const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15); static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output"); static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode"); @@ -189,27 +192,44 @@ pub async fn handle( conn_pool: Arc, session_id: uuid::Uuid, ) -> Result, ApiError> { - let result = handle_inner(request, sni_hostname, conn_pool, session_id).await; - + let result = tokio::time::timeout( + HTTP_CONNECTION_TIMEOUT, + handle_inner(request, sni_hostname, conn_pool, session_id), + ) + .await; let mut response = match result { - Ok(r) => r, - Err(e) => { - let message = format!("{:?}", e); - let code = match e.downcast_ref::() { - Some(e) => match e.code() { - Some(e) => serde_json::to_value(e.code()).unwrap(), + Ok(r) => match r { + Ok(r) => r, + Err(e) => { + let message = format!("{:?}", e); + let code = e.downcast_ref::().and_then(|e| { + e.code() + .map(|s| serde_json::to_value(s.code()).unwrap_or_default()) + }); + let code = match code { + Some(c) => c, None => Value::Null, - }, - None => Value::Null, - }; - error!( - ?code, - "sql-over-http per-client task finished with an error: {e:#}" + }; + error!( + ?code, + "sql-over-http per-client task finished with an error: {e:#}" + ); + // TODO: this shouldn't always be bad request. + json_response( + StatusCode::BAD_REQUEST, + json!({ "message": message, "code": code }), + )? + } + }, + Err(_) => { + let message = format!( + "HTTP-Connection timed out, execution time exeeded {} seconds", + HTTP_CONNECTION_TIMEOUT.as_secs() ); - // TODO: this shouldn't always be bad request. + error!(message); json_response( - StatusCode::BAD_REQUEST, - json!({ "message": message, "code": code }), + StatusCode::GATEWAY_TIMEOUT, + json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }), )? } }; @@ -227,6 +247,13 @@ async fn handle_inner( conn_pool: Arc, session_id: uuid::Uuid, ) -> anyhow::Result> { + NUM_CONNECTIONS_ACCEPTED_COUNTER + .with_label_values(&["http"]) + .inc(); + scopeguard::defer! { + NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); + } + // // Determine the destination and connection params // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index 994a7de764..ddebbccabc 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -3,7 +3,10 @@ use crate::{ config::ProxyConfig, error::io_error, protocol2::{ProxyProtocolAccept, WithClientIp}, - proxy::{handle_client, ClientMode}, + proxy::{ + handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER, + NUM_CLIENT_CONNECTION_OPENED_COUNTER, + }, }; use bytes::{Buf, Bytes}; use futures::{Sink, Stream, StreamExt}; @@ -275,23 +278,25 @@ pub async fn task_main( let conn_pool = conn_pool.clone(); async move { - Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request| { - let sni_name = sni_name.clone(); - let conn_pool = conn_pool.clone(); + Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn( + move |req: Request| { + let sni_name = sni_name.clone(); + let conn_pool = conn_pool.clone(); - async move { - let cancel_map = Arc::new(CancelMap::default()); - let session_id = uuid::Uuid::new_v4(); + async move { + let cancel_map = Arc::new(CancelMap::default()); + let session_id = uuid::Uuid::new_v4(); - ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) - .instrument(info_span!( - "ws-client", - session = %session_id, - %peer_addr, - )) - .await - } - })) + ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) + .instrument(info_span!( + "ws-client", + session = %session_id, + %peer_addr, + )) + .await + } + }, + ))) } }, ); @@ -303,3 +308,41 @@ pub async fn task_main( Ok(()) } + +struct MetricService { + inner: S, +} + +impl MetricService { + fn new(inner: S) -> MetricService { + NUM_CLIENT_CONNECTION_OPENED_COUNTER + .with_label_values(&["http"]) + .inc(); + MetricService { inner } + } +} + +impl Drop for MetricService { + fn drop(&mut self) { + NUM_CLIENT_CONNECTION_CLOSED_COUNTER + .with_label_values(&["http"]) + .inc(); + } +} + +impl hyper::service::Service> for MetricService +where + S: hyper::service::Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.inner.call(req) + } +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cef3cea514..9cfe98347b 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -7,6 +7,7 @@ use crate::{ compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api}, + http::StatusCode, metrics::{Ids, USAGE_METRICS}, protocol2::WithClientIp, stream::{PqStream, Stream}, @@ -38,19 +39,55 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { +pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( - "proxy_accepted_connections_total", - "Number of TCP client connections accepted.", + "proxy_opened_db_connections_total", + "Number of opened connections to a database.", &["protocol"], ) .unwrap() }); -static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { +pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_closed_db_connections_total", + "Number of closed connections to a database.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_opened_client_connections_total", + "Number of opened connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_closed_client_connections_total", + "Number of closed connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_accepted_connections_total", + "Number of client connections accepted.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_closed_connections_total", - "Number of TCP client connections closed.", + "Number of client connections closed.", &["protocol"], ) .unwrap() @@ -75,6 +112,15 @@ static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { .unwrap() }); +static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_connection_failures_breakdown", + "Number of wake-up failures (per kind).", + &["retry", "kind"], + ) + .unwrap() +}); + static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_io_bytes_per_client", @@ -208,12 +254,16 @@ pub async fn handle_client( "handling interactive connection from client" ); - // The `closed` counter will increase when this future is destroyed. + let proto = mode.protocol_label(); + NUM_CLIENT_CONNECTION_OPENED_COUNTER + .with_label_values(&[proto]) + .inc(); NUM_CONNECTIONS_ACCEPTED_COUNTER - .with_label_values(&[mode.protocol_label()]) + .with_label_values(&[proto]) .inc(); scopeguard::defer! { - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc(); + NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc(); + NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); } let tls = config.tls_config.as_ref(); @@ -248,7 +298,7 @@ pub async fn handle_client( mode.allow_self_signed_compute(config), ); cancel_map - .with_session(|session| client.connect_to_db(session, mode.allow_cleartext())) + .with_session(|session| client.connect_to_db(session, mode)) .await } @@ -397,6 +447,46 @@ impl ConnectMechanism for TcpMechanism<'_> { } } +const fn bool_to_str(x: bool) -> &'static str { + if x { + "true" + } else { + "false" + } +} + +fn report_error(e: &WakeComputeError, retry: bool) { + use crate::console::errors::ApiError; + let retry = bool_to_str(retry); + let kind = match e { + WakeComputeError::BadComputeAddress(_) => "bad_compute_address", + WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + ref text, + }) if text.contains("written data quota exceeded") + || text.contains("the limit for current plan reached") => + { + "quota_exceeded" + } + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + .. + }) => "api_console_locked", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + }) => "api_console_bad_request", + WakeComputeError::ApiError(ApiError::Console { status, .. }) + if status.is_server_error() => + { + "api_console_other_server_error" + } + WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error", + }; + NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc(); +} + /// Try to connect to the compute node, retrying if necessary. /// This function might update `node_info`, so we take it by `&mut`. #[tracing::instrument(skip_all)] @@ -440,10 +530,12 @@ where match handle_try_wake(wake_res, num_retries) { Err(e) => { error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); + report_error(&e, false); return Err(e.into()); } // failed to wake up but we can continue to retry Ok(ControlFlow::Continue(e)) => { + report_error(&e, true); warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); } // successfully woke up a compute node and can break the wakeup loop @@ -682,7 +774,7 @@ impl Client<'_, S> { async fn connect_to_db( self, session: cancellation::Session<'_>, - allow_cleartext: bool, + mode: ClientMode, ) -> anyhow::Result<()> { let Self { mut stream, @@ -698,7 +790,7 @@ impl Client<'_, S> { }; let auth_result = match creds - .authenticate(&extra, &mut stream, allow_cleartext) + .authenticate(&extra, &mut stream, mode.allow_cleartext()) .await { Ok(auth_result) => auth_result, @@ -724,6 +816,14 @@ impl Client<'_, S> { .or_else(|e| stream.throw_error(e)) .await?; + let proto = mode.protocol_label(); + NUM_DB_CONNECTIONS_OPENED_COUNTER + .with_label_values(&[proto]) + .inc(); + scopeguard::defer! { + NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); + } + prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?; // Before proxy passing, forward to compute whatever data is left in the // PqStream input buffer. Normally there is none, but our serverless npm diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 6104b54f44..f4c3a4aa03 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -374,8 +374,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder if conf.http_auth.is_some() { router = router.middleware(auth_middleware(|request| { #[allow(clippy::mutable_key_type)] - static ALLOWLIST_ROUTES: Lazy> = - Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect()); + static ALLOWLIST_ROUTES: Lazy> = Lazy::new(|| { + ["/v1/status", "/metrics"] + .iter() + .map(|v| v.parse().unwrap()) + .collect() + }); if ALLOWLIST_ROUTES.contains(request.uri()) { None } else {