Compare commits

...

7 Commits

Author SHA1 Message Date
Shany Pozin
face60d50b Merge pull request #5526 from neondatabase/releases/2023-10-11
Release 2023-10-11
2023-10-11 11:16:39 +03:00
Arseny Sher
685add2009 Enable /metrics without auth.
To enable auth faster.
2023-10-10 20:06:25 +03:00
Conrad Ludgate
d4dc86f8e3 proxy: more connection metrics (#5464)
## Problem

Hard to tell 
1. How many clients are connected to proxy
2. How many requests clients are making
3. How many connections are made to a database

1 and 2 are different because of the properties of HTTP.

We have 2 already tracked through `proxy_accepted_connections_total` and
`proxy_closed_connections_total`, but nothing for 1 and 3

## Summary of changes

Adds 2 new counter gauges.

*
`proxy_opened_client_connections_total`,`proxy_closed_client_connections_total`
- how many client connections are open to proxy
*
`proxy_opened_db_connections_total`,`proxy_closed_db_connections_total`
- how many active connections are made through to a database.

For TCP and Websockets, we expect all 3 of these quantities to be
roughly the same, barring users connecting but with invalid details.

For HTTP:
* client_connections/connections can differ because the client
connections can be reused.
* connections/db_connections can differ because of connection pooling.
2023-10-10 16:33:20 +01:00
Alex Chi Z
5158de70f3 proxy: breakdown wake up failure metrics (#4933)
## Problem

close https://github.com/neondatabase/neon/issues/4702

## Summary of changes

This PR adds a new metrics for wake up errors and breaks it down by most
common reasons (mostly follows the `could_retry` implementation).
2023-10-10 13:17:37 +01:00
khanova
aec9188d36 Added timeout for http requests (#5514)
# Problem
Proxy timeout for HTTP-requests

## Summary of changes
If the HTTP-request exceeds 15s, it would be killed.

Resolves: https://github.com/neondatabase/neon/issues/4847
2023-10-10 13:39:38 +02:00
John Spray
acefee9a32 pageserver: flush deletion queue on detach (#5452)
## Problem

If a caller detaches a tenant and then attaches it again, pending
deletions from the old attachment might not have happened yet. This is
not a correctness problem, but it causes:
- Risk of leaking some objects in S3
- Some warnings from the deletion queue when pending LSN updates and
pending deletions don't pass validation.

## Summary of changes

- Deletion queue now uses UnboundedChannel so that the push interfaces
don't have to be async.
- This was pulled out of https://github.com/neondatabase/neon/pull/5397,
where it is also useful to be able to drive the queue from non-async
contexts.
- Why is it okay for this to be unbounded? The only way the
unbounded-ness of the channel can become a problem is if writing out
deletion lists can't keep up, but if the system were that overloaded
then the code generating deletions (GC, compaction) would also be
impacted.
- DeletionQueueClient gets a new `flush_advisory` function, which is
like flush_execute, but doesn't wait for completion: this is appropriate
for use in contexts where we would like to encourage the deletion queue
to flush, but don't need to block on it.
- This function is also expected to be useful in next steps for seamless
migration, where the option to flush to S3 while transitioning into
AttachedStale will also include flushing deletion queue, but we wouldn't
want to block on that flush.
- The tenant_detach code in mgr.rs invokes flush_advisory after stopping
the `Tenant` object.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2023-10-10 10:46:24 +01:00
Conrad Ludgate
bf065aabdf proxy: update locked error retry filter (#5376)
## Problem

We don't want to retry customer quota exhaustion errors.

## Summary of changes

Make sure both types of quota exhaustion errors are not retried
2023-10-10 08:59:16 +01:00
11 changed files with 353 additions and 107 deletions

View File

@@ -153,7 +153,7 @@ impl FlushOp {
#[derive(Clone, Debug)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
impl DeletionQueueClient {
pub(crate) fn broken() -> Self {
// Channels whose receivers are immediately dropped.
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
Self {
tx,
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
async fn do_push<T>(
fn do_push<T>(
&self,
queue: &tokio::sync::mpsc::Sender<T>,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
msg: T,
) -> Result<(), DeletionQueueError> {
match queue.send(msg).await {
match queue.send(msg) {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
}
}
pub(crate) async fn recover(
pub(crate) fn recover(
&self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), DeletionQueueError> {
@@ -453,7 +453,6 @@ impl DeletionQueueClient {
&self.tx,
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
)
.await
}
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
@@ -526,6 +525,21 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -539,17 +553,16 @@ impl DeletionQueueClient {
objects: Vec::new(),
}),
)
.await
}
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
async fn do_flush<T>(
&self,
queue: &tokio::sync::mpsc::Sender<T>,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
msg: T,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(queue, msg).await?;
self.do_push(queue, msg)?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -570,6 +583,18 @@ impl DeletionQueueClient {
.await
}
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
/// detach where flushing is nice but not necessary.
///
/// This function provides no guarantees of work being done.
pub fn flush_advisory(&self) {
let (flush_op, _) = FlushOp::new();
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
@@ -586,9 +611,7 @@ impl DeletionQueueClient {
// Flush any immediate-mode deletions (the above backend flush will only flush
// the executor if deletions had flowed through the backend)
debug!("flush_execute: flushing execution...");
let (flush_op, rx) = FlushOp::new();
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
.await?;
self.flush_immediate().await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
@@ -643,8 +666,10 @@ impl DeletionQueue {
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
// enough to avoid this taking pathologically large amount of memory.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
@@ -957,7 +982,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
@@ -1025,7 +1050,7 @@ mod test {
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
// Generation that the control plane thinks is current
let latest_generation = Generation::new(0xdeadbeef);
@@ -1082,7 +1107,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
let tenant_id = ctx.harness.tenant_id;
@@ -1145,9 +1170,7 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
client
.recover(HashMap::from([(tenant_id, now_generation)]))
.await?;
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1173,7 +1196,7 @@ pub(crate) mod mock {
};
pub struct ConsumerState {
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
}
@@ -1250,7 +1273,7 @@ pub(crate) mod mock {
}
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed: Arc<AtomicUsize>,
remote_storage: Option<GenericRemoteStorage>,
@@ -1260,7 +1283,7 @@ pub(crate) mod mock {
impl MockDeletionQueue {
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));

View File

@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
@@ -111,7 +111,7 @@ impl ListWriter {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
cancel: CancellationToken,
) -> Self {

View File

@@ -77,7 +77,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
@@ -575,9 +575,14 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
mgr::detach_tenant(
conf,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1034,7 +1039,7 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());

View File

@@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
@@ -2076,6 +2077,15 @@ impl Tenant {
}
}
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),

View File

@@ -24,7 +24,7 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
@@ -206,8 +206,7 @@ async fn init_load_generations(
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())
.await?;
.recover(generations.clone())?;
}
Ok(Some(generations))
@@ -695,6 +694,18 @@ pub(crate) async fn upsert_location(
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
@@ -849,8 +860,16 @@ pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
detach_ignored,
deletion_queue_client,
)
.await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
@@ -875,6 +894,7 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -886,6 +906,10 @@ async fn detach_tenant0(
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {

View File

@@ -89,7 +89,10 @@ pub mod errors {
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => !text.contains("quota"),
} => {
!text.contains("written data quota exceeded")
&& !text.contains("the limit for current plan reached")
}
// retry server errors
Self::Console { status, .. } if status.is_server_error() => true,
_ => false,

View File

@@ -20,6 +20,7 @@ use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
};
use crate::{compute, config};
@@ -418,36 +419,42 @@ async fn connect_to_compute_once(
};
tokio::spawn(
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
async move {
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
loop {
let message = ready!(connection.poll_message(cx));
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
}
}
}
})
}).await
}
.instrument(span)
);

View File

@@ -24,6 +24,8 @@ use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -47,6 +49,7 @@ enum Payload {
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15);
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -189,27 +192,44 @@ pub async fn handle(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> Result<Response<Body>, ApiError> {
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let result = tokio::time::timeout(
HTTP_CONNECTION_TIMEOUT,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
let mut response = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
e.code()
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
});
let code = match code {
Some(c) => c,
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
},
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
HTTP_CONNECTION_TIMEOUT.as_secs()
);
// TODO: this shouldn't always be bad request.
error!(message);
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
)?
}
};
@@ -227,6 +247,13 @@ async fn handle_inner(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&["http"])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
//
// Determine the destination and connection params
//

View File

@@ -3,7 +3,10 @@ use crate::{
config::ProxyConfig,
error::io_error,
protocol2::{ProxyProtocolAccept, WithClientIp},
proxy::{handle_client, ClientMode},
proxy::{
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
},
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
@@ -275,23 +278,25 @@ pub async fn task_main(
let conn_pool = conn_pool.clone();
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
}))
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
},
)))
}
},
);
@@ -303,3 +308,41 @@ pub async fn task_main(
Ok(())
}
struct MetricService<S> {
inner: S,
}
impl<S> MetricService<S> {
fn new(inner: S) -> MetricService<S> {
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&["http"])
.inc();
MetricService { inner }
}
}
impl<S> Drop for MetricService<S> {
fn drop(&mut self) {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
.with_label_values(&["http"])
.inc();
}
}
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
where
S: hyper::service::Service<Request<ReqBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
self.inner.call(req)
}
}

View File

@@ -7,6 +7,7 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
@@ -38,19 +39,55 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of TCP client connections accepted.",
"proxy_opened_db_connections_total",
"Number of opened connections to a database.",
&["protocol"],
)
.unwrap()
});
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_db_connections_total",
"Number of closed connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_client_connections_total",
"Number of opened connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_client_connections_total",
"Number of closed connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of client connections accepted.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_connections_total",
"Number of TCP client connections closed.",
"Number of client connections closed.",
&["protocol"],
)
.unwrap()
@@ -75,6 +112,15 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_breakdown",
"Number of wake-up failures (per kind).",
&["retry", "kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -208,12 +254,16 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
"handling interactive connection from client"
);
// The `closed` counter will increase when this future is destroyed.
let proto = mode.protocol_label();
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&[mode.protocol_label()])
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
let tls = config.tls_config.as_ref();
@@ -248,7 +298,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
mode.allow_self_signed_compute(config),
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
.with_session(|session| client.connect_to_db(session, mode))
.await
}
@@ -397,6 +447,46 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
const fn bool_to_str(x: bool) -> &'static str {
if x {
"true"
} else {
"false"
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -440,10 +530,12 @@ where
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop
@@ -682,7 +774,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
async fn connect_to_db(
self,
session: cancellation::Session<'_>,
allow_cleartext: bool,
mode: ClientMode,
) -> anyhow::Result<()> {
let Self {
mut stream,
@@ -698,7 +790,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
};
let auth_result = match creds
.authenticate(&extra, &mut stream, allow_cleartext)
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.await
{
Ok(auth_result) => auth_result,
@@ -724,6 +816,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm

View File

@@ -374,8 +374,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
["/v1/status", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect()
});
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {