From bf065aabdf2bd8d343a87fbd5070f8885ad2c7b6 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 10 Oct 2023 08:59:16 +0100 Subject: [PATCH 1/6] proxy: update locked error retry filter (#5376) ## Problem We don't want to retry customer quota exhaustion errors. ## Summary of changes Make sure both types of quota exhaustion errors are not retried --- proxy/src/console/provider.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 7d587ff1ec..8742534f5d 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -89,7 +89,10 @@ pub mod errors { Self::Console { status: http::StatusCode::LOCKED, ref text, - } => !text.contains("quota"), + } => { + !text.contains("written data quota exceeded") + && !text.contains("the limit for current plan reached") + } // retry server errors Self::Console { status, .. } if status.is_server_error() => true, _ => false, From acefee9a32128fcde514b840723093a36f88ad8c Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 10 Oct 2023 10:46:24 +0100 Subject: [PATCH 2/6] pageserver: flush deletion queue on detach (#5452) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem If a caller detaches a tenant and then attaches it again, pending deletions from the old attachment might not have happened yet. This is not a correctness problem, but it causes: - Risk of leaking some objects in S3 - Some warnings from the deletion queue when pending LSN updates and pending deletions don't pass validation. ## Summary of changes - Deletion queue now uses UnboundedChannel so that the push interfaces don't have to be async. - This was pulled out of https://github.com/neondatabase/neon/pull/5397, where it is also useful to be able to drive the queue from non-async contexts. - Why is it okay for this to be unbounded? The only way the unbounded-ness of the channel can become a problem is if writing out deletion lists can't keep up, but if the system were that overloaded then the code generating deletions (GC, compaction) would also be impacted. - DeletionQueueClient gets a new `flush_advisory` function, which is like flush_execute, but doesn't wait for completion: this is appropriate for use in contexts where we would like to encourage the deletion queue to flush, but don't need to block on it. - This function is also expected to be useful in next steps for seamless migration, where the option to flush to S3 while transitioning into AttachedStale will also include flushing deletion queue, but we wouldn't want to block on that flush. - The tenant_detach code in mgr.rs invokes flush_advisory after stopping the `Tenant` object. --------- Co-authored-by: Arpad Müller --- pageserver/src/deletion_queue.rs | 71 +++++++++++++------- pageserver/src/deletion_queue/list_writer.rs | 4 +- pageserver/src/http/routes.rs | 13 ++-- pageserver/src/tenant.rs | 10 +++ pageserver/src/tenant/mgr.rs | 32 +++++++-- 5 files changed, 96 insertions(+), 34 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index cccc64685b..0bf851a8d7 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -153,7 +153,7 @@ impl FlushOp { #[derive(Clone, Debug)] pub struct DeletionQueueClient { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, lsn_table: Arc>, @@ -416,7 +416,7 @@ pub enum DeletionQueueError { impl DeletionQueueClient { pub(crate) fn broken() -> Self { // Channels whose receivers are immediately dropped. - let (tx, _rx) = tokio::sync::mpsc::channel(1); + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1); Self { tx, @@ -428,12 +428,12 @@ impl DeletionQueueClient { /// This is cancel-safe. If you drop the future before it completes, the message /// is not pushed, although in the context of the deletion queue it doesn't matter: once /// we decide to do a deletion the decision is always final. - async fn do_push( + fn do_push( &self, - queue: &tokio::sync::mpsc::Sender, + queue: &tokio::sync::mpsc::UnboundedSender, msg: T, ) -> Result<(), DeletionQueueError> { - match queue.send(msg).await { + match queue.send(msg) { Ok(_) => Ok(()), Err(e) => { // This shouldn't happen, we should shut down all tenants before @@ -445,7 +445,7 @@ impl DeletionQueueClient { } } - pub(crate) async fn recover( + pub(crate) fn recover( &self, attached_tenants: HashMap, ) -> Result<(), DeletionQueueError> { @@ -453,7 +453,6 @@ impl DeletionQueueClient { &self.tx, ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }), ) - .await } /// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside @@ -526,6 +525,21 @@ impl DeletionQueueClient { return self.flush_immediate().await; } + self.push_layers_sync(tenant_id, timeline_id, current_generation, layers) + } + + /// When a Tenant has a generation, push_layers is always synchronous because + /// the ListValidator channel is an unbounded channel. + /// + /// This can be merged into push_layers when we remove the Generation-less mode + /// support (``) + pub(crate) fn push_layers_sync( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + current_generation: Generation, + layers: Vec<(LayerFileName, Generation)>, + ) -> Result<(), DeletionQueueError> { metrics::DELETION_QUEUE .keys_submitted .inc_by(layers.len() as u64); @@ -539,17 +553,16 @@ impl DeletionQueueClient { objects: Vec::new(), }), ) - .await } /// This is cancel-safe. If you drop the future the flush may still happen in the background. async fn do_flush( &self, - queue: &tokio::sync::mpsc::Sender, + queue: &tokio::sync::mpsc::UnboundedSender, msg: T, rx: tokio::sync::oneshot::Receiver<()>, ) -> Result<(), DeletionQueueError> { - self.do_push(queue, msg).await?; + self.do_push(queue, msg)?; if rx.await.is_err() { // This shouldn't happen if tenants are shut down before deletion queue. If we // encounter a bug like this, then a flusher will incorrectly believe it has flushed @@ -570,6 +583,18 @@ impl DeletionQueueClient { .await } + /// Issue a flush without waiting for it to complete. This is useful on advisory flushes where + /// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant + /// detach where flushing is nice but not necessary. + /// + /// This function provides no guarantees of work being done. + pub fn flush_advisory(&self) { + let (flush_op, _) = FlushOp::new(); + + // Transmit the flush message, ignoring any result (such as a closed channel during shutdown). + drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op))); + } + // Wait until all previous deletions are executed pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> { debug!("flush_execute: flushing to deletion lists..."); @@ -586,9 +611,7 @@ impl DeletionQueueClient { // Flush any immediate-mode deletions (the above backend flush will only flush // the executor if deletions had flowed through the backend) debug!("flush_execute: flushing execution..."); - let (flush_op, rx) = FlushOp::new(); - self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx) - .await?; + self.flush_immediate().await?; debug!("flush_execute: finished flushing execution..."); Ok(()) } @@ -643,8 +666,10 @@ impl DeletionQueue { where C: ControlPlaneGenerationsApi + Send + Sync, { - // Deep channel: it consumes deletions from all timelines and we do not want to block them - let (tx, rx) = tokio::sync::mpsc::channel(16384); + // Unbounded channel: enables non-async functions to submit deletions. The actual length is + // constrained by how promptly the ListWriter wakes up and drains it, which should be frequent + // enough to avoid this taking pathologically large amount of memory. + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); // Shallow channel: it carries DeletionLists which each contain up to thousands of deletions let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16); @@ -957,7 +982,7 @@ mod test { // Basic test that the deletion queue processes the deletions we pass into it let ctx = setup("deletion_queue_smoke").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let tenant_id = ctx.harness.tenant_id; @@ -1025,7 +1050,7 @@ mod test { async fn deletion_queue_validation() -> anyhow::Result<()> { let ctx = setup("deletion_queue_validation").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; // Generation that the control plane thinks is current let latest_generation = Generation::new(0xdeadbeef); @@ -1082,7 +1107,7 @@ mod test { // Basic test that the deletion queue processes the deletions we pass into it let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup"); let client = ctx.deletion_queue.new_client(); - client.recover(HashMap::new()).await?; + client.recover(HashMap::new())?; let tenant_id = ctx.harness.tenant_id; @@ -1145,9 +1170,7 @@ mod test { drop(client); ctx.restart().await; let client = ctx.deletion_queue.new_client(); - client - .recover(HashMap::from([(tenant_id, now_generation)])) - .await?; + client.recover(HashMap::from([(tenant_id, now_generation)]))?; info!("Flush-executing"); client.flush_execute().await?; @@ -1173,7 +1196,7 @@ pub(crate) mod mock { }; pub struct ConsumerState { - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, executor_rx: tokio::sync::mpsc::Receiver, } @@ -1250,7 +1273,7 @@ pub(crate) mod mock { } pub struct MockDeletionQueue { - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::UnboundedSender, executor_tx: tokio::sync::mpsc::Sender, executed: Arc, remote_storage: Option, @@ -1260,7 +1283,7 @@ pub(crate) mod mock { impl MockDeletionQueue { pub fn new(remote_storage: Option) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(16384); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384); let executed = Arc::new(AtomicUsize::new(0)); diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs index e846340373..21b8c356cd 100644 --- a/pageserver/src/deletion_queue/list_writer.rs +++ b/pageserver/src/deletion_queue/list_writer.rs @@ -85,7 +85,7 @@ pub(super) struct ListWriter { conf: &'static PageServerConf, // Incoming frontend requests to delete some keys - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, // Outbound requests to the backend to execute deletion lists we have composed. tx: tokio::sync::mpsc::Sender, @@ -111,7 +111,7 @@ impl ListWriter { pub(super) fn new( conf: &'static PageServerConf, - rx: tokio::sync::mpsc::Receiver, + rx: tokio::sync::mpsc::UnboundedReceiver, tx: tokio::sync::mpsc::Sender, cancel: CancellationToken, ) -> Self { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e0529aeafa..0597a977c0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -575,9 +575,14 @@ async fn tenant_detach_handler( let state = get_state(&request); let conf = state.conf; - mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false)) - .instrument(info_span!("tenant_detach", %tenant_id)) - .await?; + mgr::detach_tenant( + conf, + tenant_id, + detach_ignored.unwrap_or(false), + &state.deletion_queue_client, + ) + .instrument(info_span!("tenant_detach", %tenant_id)) + .await?; json_response(StatusCode::OK, ()) } @@ -1034,7 +1039,7 @@ async fn put_tenant_location_config_handler( // The `Detached` state is special, it doesn't upsert a tenant, it removes // its local disk content and drops it from memory. if let LocationConfigMode::Detached = request_data.config.mode { - mgr::detach_tenant(conf, tenant_id, true) + mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client) .instrument(info_span!("tenant_detach", %tenant_id)) .await?; return json_response(StatusCode::OK, ()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 57c1b5f070..264f8a1ee0 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock}; use std::time::{Duration, Instant}; use self::config::AttachedLocationConfig; +use self::config::AttachmentMode; use self::config::LocationConf; use self::config::TenantConf; use self::delete::DeleteTenantFlow; @@ -2076,6 +2077,15 @@ impl Tenant { } } } + + pub(crate) fn get_attach_mode(&self) -> AttachmentMode { + self.tenant_conf + .read() + .unwrap() + .location + .attach_mode + .clone() + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index a92fbccdea..35b3be6d61 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -24,7 +24,7 @@ use crate::control_plane_client::{ }; use crate::deletion_queue::DeletionQueueClient; use crate::task_mgr::{self, TaskKind}; -use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt}; +use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::{ create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState, @@ -206,8 +206,7 @@ async fn init_load_generations( if resources.remote_storage.is_some() { resources .deletion_queue_client - .recover(generations.clone()) - .await?; + .recover(generations.clone())?; } Ok(Some(generations)) @@ -695,6 +694,18 @@ pub(crate) async fn upsert_location( if let Some(tenant) = shutdown_tenant { let (_guard, progress) = utils::completion::channel(); + + match tenant.get_attach_mode() { + AttachmentMode::Single | AttachmentMode::Multi => { + // Before we leave our state as the presumed holder of the latest generation, + // flush any outstanding deletions to reduce the risk of leaking objects. + deletion_queue_client.flush_advisory() + } + AttachmentMode::Stale => { + // If we're stale there's not point trying to flush deletions + } + }; + info!("Shutting down attached tenant"); match tenant.shutdown(progress, false).await { Ok(()) => {} @@ -849,8 +860,16 @@ pub async fn detach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, detach_ignored: bool, + deletion_queue_client: &DeletionQueueClient, ) -> Result<(), TenantStateError> { - let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?; + let tmp_path = detach_tenant0( + conf, + &TENANTS, + tenant_id, + detach_ignored, + deletion_queue_client, + ) + .await?; // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory. // After a tenant is detached, there are no more task_mgr tasks for that tenant_id. let task_tenant_id = None; @@ -875,6 +894,7 @@ async fn detach_tenant0( tenants: &tokio::sync::RwLock, tenant_id: TenantId, detach_ignored: bool, + deletion_queue_client: &DeletionQueueClient, ) -> Result { let tenant_dir_rename_operation = |tenant_id_to_clean| async move { let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean); @@ -886,6 +906,10 @@ async fn detach_tenant0( let removal_result = remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await; + // Flush pending deletions, so that they have a good chance of passing validation + // before this tenant is potentially re-attached elsewhere. + deletion_queue_client.flush_advisory(); + // Ignored tenants are not present in memory and will bail the removal from memory operation. // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then. if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) { From aec9188d36fe736caeadc7c32b5b301fdcb8eafa Mon Sep 17 00:00:00 2001 From: khanova <32508607+khanova@users.noreply.github.com> Date: Tue, 10 Oct 2023 13:39:38 +0200 Subject: [PATCH 3/6] Added timeout for http requests (#5514) # Problem Proxy timeout for HTTP-requests ## Summary of changes If the HTTP-request exceeds 15s, it would be killed. Resolves: https://github.com/neondatabase/neon/issues/4847 --- proxy/src/http/sql_over_http.rs | 52 ++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index 380e36d530..ffc788e873 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -47,6 +47,7 @@ enum Payload { const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB +const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15); static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output"); static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode"); @@ -189,27 +190,44 @@ pub async fn handle( conn_pool: Arc, session_id: uuid::Uuid, ) -> Result, ApiError> { - let result = handle_inner(request, sni_hostname, conn_pool, session_id).await; - + let result = tokio::time::timeout( + HTTP_CONNECTION_TIMEOUT, + handle_inner(request, sni_hostname, conn_pool, session_id), + ) + .await; let mut response = match result { - Ok(r) => r, - Err(e) => { - let message = format!("{:?}", e); - let code = match e.downcast_ref::() { - Some(e) => match e.code() { - Some(e) => serde_json::to_value(e.code()).unwrap(), + Ok(r) => match r { + Ok(r) => r, + Err(e) => { + let message = format!("{:?}", e); + let code = e.downcast_ref::().and_then(|e| { + e.code() + .map(|s| serde_json::to_value(s.code()).unwrap_or_default()) + }); + let code = match code { + Some(c) => c, None => Value::Null, - }, - None => Value::Null, - }; - error!( - ?code, - "sql-over-http per-client task finished with an error: {e:#}" + }; + error!( + ?code, + "sql-over-http per-client task finished with an error: {e:#}" + ); + // TODO: this shouldn't always be bad request. + json_response( + StatusCode::BAD_REQUEST, + json!({ "message": message, "code": code }), + )? + } + }, + Err(_) => { + let message = format!( + "HTTP-Connection timed out, execution time exeeded {} seconds", + HTTP_CONNECTION_TIMEOUT.as_secs() ); - // TODO: this shouldn't always be bad request. + error!(message); json_response( - StatusCode::BAD_REQUEST, - json!({ "message": message, "code": code }), + StatusCode::GATEWAY_TIMEOUT, + json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }), )? } }; From 5158de70f392ea1ff7224414d8a90c60ce7a5d16 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 10 Oct 2023 08:17:37 -0400 Subject: [PATCH 4/6] proxy: breakdown wake up failure metrics (#4933) ## Problem close https://github.com/neondatabase/neon/issues/4702 ## Summary of changes This PR adds a new metrics for wake up errors and breaks it down by most common reasons (mostly follows the `could_retry` implementation). --- proxy/src/proxy.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cef3cea514..f1343ae014 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -7,6 +7,7 @@ use crate::{ compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api}, + http::StatusCode, metrics::{Ids, USAGE_METRICS}, protocol2::WithClientIp, stream::{PqStream, Stream}, @@ -75,6 +76,15 @@ static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { .unwrap() }); +static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_connection_failures_breakdown", + "Number of wake-up failures (per kind).", + &["retry", "kind"], + ) + .unwrap() +}); + static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_io_bytes_per_client", @@ -397,6 +407,46 @@ impl ConnectMechanism for TcpMechanism<'_> { } } +const fn bool_to_str(x: bool) -> &'static str { + if x { + "true" + } else { + "false" + } +} + +fn report_error(e: &WakeComputeError, retry: bool) { + use crate::console::errors::ApiError; + let retry = bool_to_str(retry); + let kind = match e { + WakeComputeError::BadComputeAddress(_) => "bad_compute_address", + WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + ref text, + }) if text.contains("written data quota exceeded") + || text.contains("the limit for current plan reached") => + { + "quota_exceeded" + } + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + .. + }) => "api_console_locked", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + }) => "api_console_bad_request", + WakeComputeError::ApiError(ApiError::Console { status, .. }) + if status.is_server_error() => + { + "api_console_other_server_error" + } + WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error", + }; + NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc(); +} + /// Try to connect to the compute node, retrying if necessary. /// This function might update `node_info`, so we take it by `&mut`. #[tracing::instrument(skip_all)] @@ -440,10 +490,12 @@ where match handle_try_wake(wake_res, num_retries) { Err(e) => { error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); + report_error(&e, false); return Err(e.into()); } // failed to wake up but we can continue to retry Ok(ControlFlow::Continue(e)) => { + report_error(&e, true); warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); } // successfully woke up a compute node and can break the wakeup loop From d4dc86f8e317af1369f568416e26fd1c47ae6956 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 10 Oct 2023 16:33:20 +0100 Subject: [PATCH 5/6] proxy: more connection metrics (#5464) ## Problem Hard to tell 1. How many clients are connected to proxy 2. How many requests clients are making 3. How many connections are made to a database 1 and 2 are different because of the properties of HTTP. We have 2 already tracked through `proxy_accepted_connections_total` and `proxy_closed_connections_total`, but nothing for 1 and 3 ## Summary of changes Adds 2 new counter gauges. * `proxy_opened_client_connections_total`,`proxy_closed_client_connections_total` - how many client connections are open to proxy * `proxy_opened_db_connections_total`,`proxy_closed_db_connections_total` - how many active connections are made through to a database. For TCP and Websockets, we expect all 3 of these quantities to be roughly the same, barring users connecting but with invalid details. For HTTP: * client_connections/connections can differ because the client connections can be reused. * connections/db_connections can differ because of connection pooling. --- proxy/src/http/conn_pool.rs | 57 ++++++++++++++----------- proxy/src/http/sql_over_http.rs | 9 ++++ proxy/src/http/websocket.rs | 75 ++++++++++++++++++++++++++------- proxy/src/proxy.rs | 70 +++++++++++++++++++++++++----- 4 files changed, 159 insertions(+), 52 deletions(-) diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index a7ef15d342..7258f1a2cf 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -20,6 +20,7 @@ use tokio_postgres::AsyncMessage; use crate::{ auth, console, metrics::{Ids, MetricCounter, USAGE_METRICS}, + proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER}, }; use crate::{compute, config}; @@ -418,36 +419,42 @@ async fn connect_to_compute_once( }; tokio::spawn( - poll_fn(move |cx| { - if matches!(rx.has_changed(), Ok(true)) { - session = *rx.borrow_and_update(); - info!(%session, "changed session"); + async move { + NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc(); + scopeguard::defer! { + NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); } + poll_fn(move |cx| { + if matches!(rx.has_changed(), Ok(true)) { + session = *rx.borrow_and_update(); + info!(%session, "changed session"); + } - loop { - let message = ready!(connection.poll_message(cx)); + loop { + let message = ready!(connection.poll_message(cx)); - match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session, "notice: {}", notice); - } - Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); - } - Some(Ok(_)) => { - warn!(%session, "unknown message"); - } - Some(Err(e)) => { - error!(%session, "connection error: {}", e); - return Poll::Ready(()) - } - None => { - info!("connection closed"); - return Poll::Ready(()) + match message { + Some(Ok(AsyncMessage::Notice(notice))) => { + info!(%session, "notice: {}", notice); + } + Some(Ok(AsyncMessage::Notification(notif))) => { + warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); + } + Some(Ok(_)) => { + warn!(%session, "unknown message"); + } + Some(Err(e)) => { + error!(%session, "connection error: {}", e); + return Poll::Ready(()) + } + None => { + info!("connection closed"); + return Poll::Ready(()) + } } } - } - }) + }).await + } .instrument(span) ); diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index ffc788e873..fbf19bcb50 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -24,6 +24,8 @@ use url::Url; use utils::http::error::ApiError; use utils::http::json::json_response; +use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER}; + use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; @@ -245,6 +247,13 @@ async fn handle_inner( conn_pool: Arc, session_id: uuid::Uuid, ) -> anyhow::Result> { + NUM_CONNECTIONS_ACCEPTED_COUNTER + .with_label_values(&["http"]) + .inc(); + scopeguard::defer! { + NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); + } + // // Determine the destination and connection params // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index 994a7de764..ddebbccabc 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -3,7 +3,10 @@ use crate::{ config::ProxyConfig, error::io_error, protocol2::{ProxyProtocolAccept, WithClientIp}, - proxy::{handle_client, ClientMode}, + proxy::{ + handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER, + NUM_CLIENT_CONNECTION_OPENED_COUNTER, + }, }; use bytes::{Buf, Bytes}; use futures::{Sink, Stream, StreamExt}; @@ -275,23 +278,25 @@ pub async fn task_main( let conn_pool = conn_pool.clone(); async move { - Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request| { - let sni_name = sni_name.clone(); - let conn_pool = conn_pool.clone(); + Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn( + move |req: Request| { + let sni_name = sni_name.clone(); + let conn_pool = conn_pool.clone(); - async move { - let cancel_map = Arc::new(CancelMap::default()); - let session_id = uuid::Uuid::new_v4(); + async move { + let cancel_map = Arc::new(CancelMap::default()); + let session_id = uuid::Uuid::new_v4(); - ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) - .instrument(info_span!( - "ws-client", - session = %session_id, - %peer_addr, - )) - .await - } - })) + ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) + .instrument(info_span!( + "ws-client", + session = %session_id, + %peer_addr, + )) + .await + } + }, + ))) } }, ); @@ -303,3 +308,41 @@ pub async fn task_main( Ok(()) } + +struct MetricService { + inner: S, +} + +impl MetricService { + fn new(inner: S) -> MetricService { + NUM_CLIENT_CONNECTION_OPENED_COUNTER + .with_label_values(&["http"]) + .inc(); + MetricService { inner } + } +} + +impl Drop for MetricService { + fn drop(&mut self) { + NUM_CLIENT_CONNECTION_CLOSED_COUNTER + .with_label_values(&["http"]) + .inc(); + } +} + +impl hyper::service::Service> for MetricService +where + S: hyper::service::Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.inner.call(req) + } +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index f1343ae014..9cfe98347b 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -39,19 +39,55 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { +pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( - "proxy_accepted_connections_total", - "Number of TCP client connections accepted.", + "proxy_opened_db_connections_total", + "Number of opened connections to a database.", &["protocol"], ) .unwrap() }); -static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { +pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_closed_db_connections_total", + "Number of closed connections to a database.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_opened_client_connections_total", + "Number of opened connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_closed_client_connections_total", + "Number of closed connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_accepted_connections_total", + "Number of client connections accepted.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_closed_connections_total", - "Number of TCP client connections closed.", + "Number of client connections closed.", &["protocol"], ) .unwrap() @@ -218,12 +254,16 @@ pub async fn handle_client( "handling interactive connection from client" ); - // The `closed` counter will increase when this future is destroyed. + let proto = mode.protocol_label(); + NUM_CLIENT_CONNECTION_OPENED_COUNTER + .with_label_values(&[proto]) + .inc(); NUM_CONNECTIONS_ACCEPTED_COUNTER - .with_label_values(&[mode.protocol_label()]) + .with_label_values(&[proto]) .inc(); scopeguard::defer! { - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc(); + NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc(); + NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); } let tls = config.tls_config.as_ref(); @@ -258,7 +298,7 @@ pub async fn handle_client( mode.allow_self_signed_compute(config), ); cancel_map - .with_session(|session| client.connect_to_db(session, mode.allow_cleartext())) + .with_session(|session| client.connect_to_db(session, mode)) .await } @@ -734,7 +774,7 @@ impl Client<'_, S> { async fn connect_to_db( self, session: cancellation::Session<'_>, - allow_cleartext: bool, + mode: ClientMode, ) -> anyhow::Result<()> { let Self { mut stream, @@ -750,7 +790,7 @@ impl Client<'_, S> { }; let auth_result = match creds - .authenticate(&extra, &mut stream, allow_cleartext) + .authenticate(&extra, &mut stream, mode.allow_cleartext()) .await { Ok(auth_result) => auth_result, @@ -776,6 +816,14 @@ impl Client<'_, S> { .or_else(|e| stream.throw_error(e)) .await?; + let proto = mode.protocol_label(); + NUM_DB_CONNECTIONS_OPENED_COUNTER + .with_label_values(&[proto]) + .inc(); + scopeguard::defer! { + NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); + } + prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?; // Before proxy passing, forward to compute whatever data is left in the // PqStream input buffer. Normally there is none, but our serverless npm From 685add2009fd6c3522afd754ea5899433c56c949 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 10 Oct 2023 17:24:57 +0300 Subject: [PATCH 6/6] Enable /metrics without auth. To enable auth faster. --- pageserver/src/http/routes.rs | 2 +- safekeeper/src/http/routes.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 0597a977c0..c42bd956f3 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -77,7 +77,7 @@ impl State { disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, ) -> anyhow::Result { - let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] + let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"] .iter() .map(|v| v.parse().unwrap()) .collect::>(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 6104b54f44..f4c3a4aa03 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -374,8 +374,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder if conf.http_auth.is_some() { router = router.middleware(auth_middleware(|request| { #[allow(clippy::mutable_key_type)] - static ALLOWLIST_ROUTES: Lazy> = - Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect()); + static ALLOWLIST_ROUTES: Lazy> = Lazy::new(|| { + ["/v1/status", "/metrics"] + .iter() + .map(|v| v.parse().unwrap()) + .collect() + }); if ALLOWLIST_ROUTES.contains(request.uri()) { None } else {