Compare commits

...

16 Commits

Author SHA1 Message Date
Heikki Linnakangas
081b0d1e80 Use explicit counter to detect when WAL redo process has been restarted.
More robust than relying on FDs.
2023-10-13 17:08:09 +03:00
Konstantin Knizhnik
e083c86c93 Move saving of stdin descriptor 2023-10-13 09:16:52 +03:00
Konstantin Knizhnik
3406676abd Check if walredo pipe was recreated by some other backend before klilling walredo process 2023-10-12 22:53:27 +03:00
khanova
21deb81acb Fix case for array of jsons (#5523)
## Problem

Currently proxy doesn't handle array of json parameters correctly.

## Summary of changes

Added one more level of quotes escaping for the array of jsons case.
Resolves: https://github.com/neondatabase/neon/issues/5515
2023-10-12 14:32:49 +02:00
khanova
dbb21d6592 Make http timeout configurable (#5532)
## Problem

Currently http timeout is hardcoded to 15 seconds.

## Summary of changes

Added an option to configure it via cli args.

Context: https://neondb.slack.com/archives/C04DGM6SMTM/p1696941726151899
2023-10-12 11:41:07 +02:00
Joonas Koivunen
ddceb9e6cd fix(branching): read last record lsn only after Tenant::gc_cs (#5535)
Fixes #5531, at least the latest error of not being able to create a
branch from the head under write and gc pressure.
2023-10-11 16:24:36 +01:00
John Spray
0fc3708de2 pageserver: use a backoff::retry in Deleter (#5534)
## Problem

The `Deleter` currently doesn't use a backoff::retry because it doesn't
need to: it is already inside a loop when doing the deletion, so can
just let the loop go around.

However, this is a problem for logging, because we log on errors, which
includes things like 503/429 cases that would usually be swallowed by a
backoff::retry in most places we use the RemoteStorage interface.

The underlying problem is that RemoteStorage doesn't have a proper error
type, and an anyhow::Error can't easily be interrogated for its original
S3 SdkError because downcast_ref requires a concrete type, but SdkError
is parametrized on response type.

## Summary of changes

Wrap remote deletions in Deleter in a backoff::retry to avoid logging
warnings on transient 429/503 conditions, and for symmetry with how
RemoteStorage is used in other places.
2023-10-11 15:25:08 +01:00
John Spray
e0c8ad48d4 remote_storage: log detail errors in delete_objects (#5530)
## Problem

When we got an error in the payload of a DeleteObjects response, we only
logged how many errors, not what they were.

## Summary of changes

Log up to 10 specific errors. We do not log all of them because that
would be up to 1000 log lines per request.
2023-10-11 13:22:00 +01:00
John Spray
39e144696f pageserver: clean up mgr.rs types that needn't be public (#5529)
## Problem

These types/functions are public and it prevents clippy from catching
unused things.

## Summary of changes

Move to `pub(crate)` and remove the error enum that becomes clearly
unused as a result.
2023-10-11 11:50:16 +00:00
Alexander Bayandin
653044f754 test_runners: increase some timeouts to make tests less flaky (#5521)
## Problem
- `test_heavy_write_workload` is flaky, and fails because of to
statement timeout
- `test_wal_lagging` is flaky and fails because of the default pytest
timeout (see https://github.com/neondatabase/neon/issues/5305)

## Summary of changes
- `test_heavy_write_workload`: increase statement timeout to 5 minutes
(from default 2 minutes)
- `test_wal_lagging`: increase pytest timeout to 600s (from default
300s)
2023-10-11 10:49:15 +01:00
Vadim Kharitonov
80dcdfa8bf Update pgvector to 0.5.1 (#5525) 2023-10-11 09:47:19 +01:00
Arseny Sher
685add2009 Enable /metrics without auth.
To enable auth faster.
2023-10-10 20:06:25 +03:00
Conrad Ludgate
d4dc86f8e3 proxy: more connection metrics (#5464)
## Problem

Hard to tell 
1. How many clients are connected to proxy
2. How many requests clients are making
3. How many connections are made to a database

1 and 2 are different because of the properties of HTTP.

We have 2 already tracked through `proxy_accepted_connections_total` and
`proxy_closed_connections_total`, but nothing for 1 and 3

## Summary of changes

Adds 2 new counter gauges.

*
`proxy_opened_client_connections_total`,`proxy_closed_client_connections_total`
- how many client connections are open to proxy
*
`proxy_opened_db_connections_total`,`proxy_closed_db_connections_total`
- how many active connections are made through to a database.

For TCP and Websockets, we expect all 3 of these quantities to be
roughly the same, barring users connecting but with invalid details.

For HTTP:
* client_connections/connections can differ because the client
connections can be reused.
* connections/db_connections can differ because of connection pooling.
2023-10-10 16:33:20 +01:00
Alex Chi Z
5158de70f3 proxy: breakdown wake up failure metrics (#4933)
## Problem

close https://github.com/neondatabase/neon/issues/4702

## Summary of changes

This PR adds a new metrics for wake up errors and breaks it down by most
common reasons (mostly follows the `could_retry` implementation).
2023-10-10 13:17:37 +01:00
khanova
aec9188d36 Added timeout for http requests (#5514)
# Problem
Proxy timeout for HTTP-requests

## Summary of changes
If the HTTP-request exceeds 15s, it would be killed.

Resolves: https://github.com/neondatabase/neon/issues/4847
2023-10-10 13:39:38 +02:00
John Spray
acefee9a32 pageserver: flush deletion queue on detach (#5452)
## Problem

If a caller detaches a tenant and then attaches it again, pending
deletions from the old attachment might not have happened yet. This is
not a correctness problem, but it causes:
- Risk of leaking some objects in S3
- Some warnings from the deletion queue when pending LSN updates and
pending deletions don't pass validation.

## Summary of changes

- Deletion queue now uses UnboundedChannel so that the push interfaces
don't have to be async.
- This was pulled out of https://github.com/neondatabase/neon/pull/5397,
where it is also useful to be able to drive the queue from non-async
contexts.
- Why is it okay for this to be unbounded? The only way the
unbounded-ness of the channel can become a problem is if writing out
deletion lists can't keep up, but if the system were that overloaded
then the code generating deletions (GC, compaction) would also be
impacted.
- DeletionQueueClient gets a new `flush_advisory` function, which is
like flush_execute, but doesn't wait for completion: this is appropriate
for use in contexts where we would like to encourage the deletion queue
to flush, but don't need to block on it.
- This function is also expected to be useful in next steps for seamless
migration, where the option to flush to S3 while transitioning into
AttachedStale will also include flushing deletion queue, but we wouldn't
want to block on that flush.
- The tenant_detach code in mgr.rs invokes flush_advisory after stopping
the `Tenant` object.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2023-10-10 10:46:24 +01:00
20 changed files with 496 additions and 171 deletions

View File

@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \

View File

@@ -4,7 +4,7 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::sync::Arc;
use std::{borrow::Cow, sync::Arc};
use anyhow::Context;
use aws_config::{
@@ -556,6 +556,20 @@ impl RemoteStorage for S3Bucket {
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()

View File

@@ -153,7 +153,7 @@ impl FlushOp {
#[derive(Clone, Debug)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
impl DeletionQueueClient {
pub(crate) fn broken() -> Self {
// Channels whose receivers are immediately dropped.
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
Self {
tx,
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
async fn do_push<T>(
fn do_push<T>(
&self,
queue: &tokio::sync::mpsc::Sender<T>,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
msg: T,
) -> Result<(), DeletionQueueError> {
match queue.send(msg).await {
match queue.send(msg) {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
}
}
pub(crate) async fn recover(
pub(crate) fn recover(
&self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), DeletionQueueError> {
@@ -453,7 +453,6 @@ impl DeletionQueueClient {
&self.tx,
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
)
.await
}
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
@@ -526,6 +525,21 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -539,17 +553,16 @@ impl DeletionQueueClient {
objects: Vec::new(),
}),
)
.await
}
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
async fn do_flush<T>(
&self,
queue: &tokio::sync::mpsc::Sender<T>,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
msg: T,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(queue, msg).await?;
self.do_push(queue, msg)?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -570,6 +583,18 @@ impl DeletionQueueClient {
.await
}
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
/// detach where flushing is nice but not necessary.
///
/// This function provides no guarantees of work being done.
pub fn flush_advisory(&self) {
let (flush_op, _) = FlushOp::new();
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
@@ -586,9 +611,7 @@ impl DeletionQueueClient {
// Flush any immediate-mode deletions (the above backend flush will only flush
// the executor if deletions had flowed through the backend)
debug!("flush_execute: flushing execution...");
let (flush_op, rx) = FlushOp::new();
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
.await?;
self.flush_immediate().await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
@@ -643,8 +666,10 @@ impl DeletionQueue {
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
// enough to avoid this taking pathologically large amount of memory.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
@@ -957,7 +982,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
@@ -1025,7 +1050,7 @@ mod test {
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
// Generation that the control plane thinks is current
let latest_generation = Generation::new(0xdeadbeef);
@@ -1082,7 +1107,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new()).await?;
client.recover(HashMap::new())?;
let tenant_id = ctx.harness.tenant_id;
@@ -1145,9 +1170,7 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
client
.recover(HashMap::from([(tenant_id, now_generation)]))
.await?;
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1173,7 +1196,7 @@ pub(crate) mod mock {
};
pub struct ConsumerState {
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
}
@@ -1250,7 +1273,7 @@ pub(crate) mod mock {
}
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed: Arc<AtomicUsize>,
remote_storage: Option<GenericRemoteStorage>,
@@ -1260,7 +1283,7 @@ pub(crate) mod mock {
impl MockDeletionQueue {
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));

View File

@@ -13,6 +13,7 @@ use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use utils::backoff;
use crate::metrics;
@@ -63,7 +64,19 @@ impl Deleter {
Err(anyhow::anyhow!("failpoint hit"))
});
self.remote_storage.delete_objects(&self.accumulator).await
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|_| false,
3,
10,
"executing deletion batch",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
}
/// Block until everything in accumulator has been executed
@@ -88,7 +101,10 @@ impl Deleter {
self.accumulator.clear();
}
Err(e) => {
warn!("DeleteObjects request failed: {e:#}, will retry");
if self.cancel.is_cancelled() {
return Err(DeletionQueueError::ShuttingDown);
}
warn!("DeleteObjects request failed: {e:#}, will continue trying");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["execute"])

View File

@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
@@ -111,7 +111,7 @@ impl ListWriter {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
cancel: CancellationToken,
) -> Self {

View File

@@ -77,7 +77,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
@@ -164,9 +164,6 @@ impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::NotActive(_) => {
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
@@ -575,9 +572,14 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
mgr::detach_tenant(
conf,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1034,7 +1036,7 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());

View File

@@ -45,6 +45,7 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
@@ -208,7 +209,7 @@ pub struct Tenant {
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
@@ -2076,6 +2077,15 @@ impl Tenant {
}
}
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -2745,6 +2755,11 @@ impl Tenant {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
let start_lsn = start_lsn.unwrap_or_else(|| {
let lsn = src_timeline.get_last_record_lsn();
@@ -2752,11 +2767,6 @@ impl Tenant {
lsn
});
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// Create a placeholder for the new branch. This will error
// out if the new timeline ID is already in use.
let timeline_uninit_mark = {

View File

@@ -31,7 +31,7 @@ use super::{
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)]
pub enum DeleteTenantError {
pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
Ok(())
}
pub async fn should_resume_deletion(
pub(crate) async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,

View File

@@ -24,7 +24,7 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
@@ -50,7 +50,7 @@ use super::TenantSharedResources;
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub enum TenantSlot {
pub(crate) enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
@@ -206,8 +206,7 @@ async fn init_load_generations(
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())
.await?;
.recover(generations.clone())?;
}
Ok(Some(generations))
@@ -482,7 +481,7 @@ pub(crate) fn schedule_local_tenant_processing(
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub async fn shutdown_all_tenants() {
pub(crate) async fn shutdown_all_tenants() {
shutdown_all_tenants0(&TENANTS).await
}
@@ -594,7 +593,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
// caller will log how long we took
}
pub async fn create_tenant(
pub(crate) async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -629,14 +628,14 @@ pub async fn create_tenant(
}
#[derive(Debug, thiserror::Error)]
pub enum SetNewTenantConfigError {
pub(crate) enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub async fn set_new_tenant_config(
pub(crate) async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -695,6 +694,18 @@ pub(crate) async fn upsert_location(
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
@@ -765,7 +776,7 @@ pub(crate) async fn upsert_location(
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
pub(crate) enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
@@ -781,7 +792,7 @@ pub enum GetTenantError {
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub async fn get_tenant(
pub(crate) async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
@@ -806,7 +817,7 @@ pub async fn get_tenant(
}
}
pub async fn delete_tenant(
pub(crate) async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
@@ -815,7 +826,7 @@ pub async fn delete_tenant(
}
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
pub(crate) enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] GetTenantError),
@@ -823,7 +834,7 @@ pub enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub async fn delete_timeline(
pub(crate) async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
@@ -834,23 +845,29 @@ pub async fn delete_timeline(
}
#[derive(Debug, thiserror::Error)]
pub enum TenantStateError {
pub(crate) enum TenantStateError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is stopping")]
IsStopping(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub async fn detach_tenant(
pub(crate) async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
detach_ignored,
deletion_queue_client,
)
.await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
@@ -875,6 +892,7 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -886,6 +904,10 @@ async fn detach_tenant0(
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
@@ -902,7 +924,7 @@ async fn detach_tenant0(
removal_result
}
pub async fn load_tenant(
pub(crate) async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -939,7 +961,7 @@ pub async fn load_tenant(
Ok(())
}
pub async fn ignore_tenant(
pub(crate) async fn ignore_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
@@ -967,7 +989,7 @@ async fn ignore_tenant0(
}
#[derive(Debug, thiserror::Error)]
pub enum TenantMapListError {
pub(crate) enum TenantMapListError {
#[error("tenant map is still initiailizing")]
Initializing,
}
@@ -975,7 +997,7 @@ pub enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -993,7 +1015,7 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
///
/// Downloading all the tenant data is performed in the background, this merely
/// spawns the background task and returns quickly.
pub async fn attach_tenant(
pub(crate) async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -1030,7 +1052,7 @@ pub async fn attach_tenant(
}
#[derive(Debug, thiserror::Error)]
pub enum TenantMapInsertError {
pub(crate) enum TenantMapInsertError {
#[error("tenant map is still initializing")]
StillInitializing,
#[error("tenant map is shutting down")]
@@ -1193,7 +1215,7 @@ use {
utils::http::error::ApiError,
};
pub async fn immediate_gc(
pub(crate) async fn immediate_gc(
tenant_id: TenantId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,

View File

@@ -30,6 +30,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::time::Duration;
use std::time::Instant;
@@ -39,7 +40,7 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
#[cfg(feature = "testing")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
@@ -93,6 +94,7 @@ pub trait WalRedoManager: Send + Sync {
}
struct ProcessInput {
restart_no: u64,
child: NoLeakChild,
stdin: ChildStdin,
stderr_fd: RawFd,
@@ -101,6 +103,7 @@ struct ProcessInput {
}
struct ProcessOutput {
restart_no: u64,
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
@@ -120,6 +123,7 @@ pub struct PostgresRedoManager {
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
restart_counter: AtomicU64,
stdout: Mutex<Option<ProcessOutput>>,
stdin: Mutex<Option<ProcessInput>>,
stderr: Mutex<Option<ChildStderr>>,
@@ -228,6 +232,7 @@ impl PostgresRedoManager {
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
restart_counter: AtomicU64::new(0),
tenant_id,
conf,
#[cfg(feature = "testing")]
@@ -273,6 +278,7 @@ impl PostgresRedoManager {
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
}
let restart_no = proc.as_ref().unwrap().restart_no;
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
@@ -322,18 +328,12 @@ impl PostgresRedoManager {
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
// The owning objects (ChildStdout and ChildStderr) are stored in
// self.stdout and self.stderr, respsectively.
// We intentionally keep them open here to avoid a race between
// currently running `apply_wal_records()` and a `launch()` call
// after we return here.
// The currently running `apply_wal_records()` must not read from
// the newly launched process.
// By keeping self.stdout and self.stderr open here, `launch()` will
// get other file descriptors for the new child's stdout and stderr,
// and hence the current `apply_wal_records()` calls will observe
// `output.stdout.as_raw_fd() != stdout_fd` .
// self.stdout and self.stderr, respectively.
// They will be closed when the new process is launched.
if let Some(proc) = self.stdin.lock().unwrap().take() {
proc.child.kill_and_wait();
if proc.restart_no == restart_no {
proc.child.kill_and_wait();
}
}
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
@@ -730,7 +730,9 @@ impl PostgresRedoManager {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let restart_no = self.restart_counter.fetch_add(1, Ordering::SeqCst);
**input = Some(ProcessInput {
restart_no,
child,
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
@@ -739,6 +741,7 @@ impl PostgresRedoManager {
});
*self.stdout.lock().unwrap() = Some(ProcessOutput {
restart_no,
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
@@ -810,13 +813,13 @@ impl PostgresRedoManager {
) -> Result<Bytes, std::io::Error> {
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
let restart_no = proc.restart_no;
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(stdout_fd, PollFlags::POLLIN),
PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
];
// We do two things simultaneously: send the old base image and WAL records to
@@ -891,13 +894,10 @@ impl PostgresRedoManager {
let mut output_guard = self.stdout.lock().unwrap();
let output = output_guard.as_mut().unwrap();
if output.stdout.as_raw_fd() != stdout_fd {
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
// it can happen that we send request to one process and waiting response from another.
// To prevent such situation we compare stdout file descriptors.
// As far as old stdout pipe is destroyed only after new one is created,
// it can not reuse the same file descriptor, so this check is safe.
if output.restart_no != restart_no {
// If restart_no changed, the walredo process crashed and was restarted
// between dropping the 'input' lock and acquiring 'output'. In that case,
// 'output' belongs to different process than where we sent the request.
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.

View File

@@ -1,5 +1,6 @@
use futures::future::Either;
use proxy::auth;
use proxy::config::HttpConfig;
use proxy::console;
use proxy::http;
use proxy::metrics;
@@ -79,6 +80,9 @@ struct ProxyCliArgs {
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
/// timeout for http connections
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
}
#[tokio::main]
@@ -220,12 +224,15 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
auth::BackendType::Link(Cow::Owned(url))
}
};
let http_config = HttpConfig {
sql_over_http_timeout: args.sql_over_http_timeout,
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
}));
Ok(config)

View File

@@ -13,6 +13,7 @@ pub struct ProxyConfig {
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
}
#[derive(Debug)]
@@ -26,6 +27,10 @@ pub struct TlsConfig {
pub common_names: Option<HashSet<String>>,
}
pub struct HttpConfig {
pub sql_over_http_timeout: tokio::time::Duration,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()

View File

@@ -20,6 +20,7 @@ use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
};
use crate::{compute, config};
@@ -418,36 +419,42 @@ async fn connect_to_compute_once(
};
tokio::spawn(
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
async move {
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
loop {
let message = ready!(connection.poll_message(cx));
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
}
}
}
})
}).await
}
.instrument(span)
);

View File

@@ -24,6 +24,9 @@ use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::config::HttpConfig;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -99,9 +102,9 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
// convert to text with escaping
Value::Bool(_) => serde_json::to_string(value).map(Some),
Value::Number(_) => serde_json::to_string(value).map(Some),
Value::Object(_) => serde_json::to_string(value).map(Some),
// here string needs to be escaped, as it is part of the array
Value::Object(_) => json_array_to_pg_array(&Value::String(serde_json::to_string(value)?)),
Value::String(_) => serde_json::to_string(value).map(Some),
// recurse into array
@@ -188,28 +191,46 @@ pub async fn handle(
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
config: &'static HttpConfig,
) -> Result<Response<Body>, ApiError> {
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let result = tokio::time::timeout(
config.sql_over_http_timeout,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
let mut response = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
e.code()
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
});
let code = match code {
Some(c) => c,
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
},
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.sql_over_http_timeout.as_secs()
);
// TODO: this shouldn't always be bad request.
error!(message);
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
)?
}
};
@@ -227,6 +248,13 @@ async fn handle_inner(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&["http"])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
//
// Determine the destination and connection params
//
@@ -585,7 +613,7 @@ fn _pg_array_parse(
}
}
}
'}' => {
'}' if !quote => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
@@ -669,6 +697,14 @@ mod tests {
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
)]
);
// array of objects
let json = r#"[{"foo": 1},{"bar": 2}]"#;
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
);
}
#[test]
@@ -796,4 +832,23 @@ mod tests {
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
json!([{"foo": 1, "bar": 2}])
);
assert_eq!(
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
json!([{"foo": 1}, {"bar": 2}])
);
assert_eq!(
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
json!([[{"foo": 1}, {"bar": 2}]])
);
}
}

View File

@@ -3,7 +3,10 @@ use crate::{
config::ProxyConfig,
error::io_error,
protocol2::{ProxyProtocolAccept, WithClientIp},
proxy::{handle_client, ClientMode},
proxy::{
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
},
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
@@ -202,7 +205,14 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
sql_over_http::handle(
request,
sni_hostname,
conn_pool,
session_id,
&config.http_config,
)
.await
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
@@ -275,23 +285,25 @@ pub async fn task_main(
let conn_pool = conn_pool.clone();
async move {
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
}))
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
},
)))
}
},
);
@@ -303,3 +315,41 @@ pub async fn task_main(
Ok(())
}
struct MetricService<S> {
inner: S,
}
impl<S> MetricService<S> {
fn new(inner: S) -> MetricService<S> {
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&["http"])
.inc();
MetricService { inner }
}
}
impl<S> Drop for MetricService<S> {
fn drop(&mut self) {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
.with_label_values(&["http"])
.inc();
}
}
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
where
S: hyper::service::Service<Request<ReqBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
self.inner.call(req)
}
}

View File

@@ -7,6 +7,7 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
@@ -38,19 +39,55 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of TCP client connections accepted.",
"proxy_opened_db_connections_total",
"Number of opened connections to a database.",
&["protocol"],
)
.unwrap()
});
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_db_connections_total",
"Number of closed connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_client_connections_total",
"Number of opened connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_client_connections_total",
"Number of closed connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of client connections accepted.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_connections_total",
"Number of TCP client connections closed.",
"Number of client connections closed.",
&["protocol"],
)
.unwrap()
@@ -75,6 +112,15 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_breakdown",
"Number of wake-up failures (per kind).",
&["retry", "kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -208,12 +254,16 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
"handling interactive connection from client"
);
// The `closed` counter will increase when this future is destroyed.
let proto = mode.protocol_label();
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&[mode.protocol_label()])
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
let tls = config.tls_config.as_ref();
@@ -248,7 +298,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
mode.allow_self_signed_compute(config),
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
.with_session(|session| client.connect_to_db(session, mode))
.await
}
@@ -397,6 +447,46 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
const fn bool_to_str(x: bool) -> &'static str {
if x {
"true"
} else {
"false"
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -440,10 +530,12 @@ where
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop
@@ -682,7 +774,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
async fn connect_to_db(
self,
session: cancellation::Session<'_>,
allow_cleartext: bool,
mode: ClientMode,
) -> anyhow::Result<()> {
let Self {
mut stream,
@@ -698,7 +790,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
};
let auth_result = match creds
.authenticate(&extra, &mut stream, allow_cleartext)
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.await
{
Ok(auth_result) => auth_result,
@@ -724,6 +816,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm

View File

@@ -374,8 +374,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
["/v1/status", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect()
});
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {

View File

@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
def start_single_table_workload(table_id: int):
for _ in range(num_iters):
with env.pg.connect().cursor() as cur:
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
cur.execute(
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
)

View File

@@ -188,7 +188,7 @@ def test_sql_over_http(static_proxy: NeonProxy):
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200
assert response.status_code == 200, response.text
return response.json()
rows = q("select 42 as answer")["rows"]
@@ -206,6 +206,12 @@ def test_sql_over_http(static_proxy: NeonProxy):
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
assert rows == [{"answer": {"b": 42}}]
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
assert rows == [{"answer": [{}]}]
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
rows = q("select * from pg_class limit 1")["rows"]
assert len(rows) == 1

View File

@@ -6,6 +6,7 @@ from pathlib import Path
from typing import List, Optional
import asyncpg
import pytest
import toml
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
@@ -597,7 +598,10 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
assert res == expected_sum
# do inserts while restarting postgres and messing with safekeeper addresses
# Do inserts while restarting postgres and messing with safekeeper addresses.
# The test takes more than default 5 minutes on Postgres 16,
# see https://github.com/neondatabase/neon/issues/5305
@pytest.mark.timeout(600)
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()