Compare commits

...

19 Commits

Author SHA1 Message Date
John Spray
ed3e3b6f61 pageserver: enable setting a target disk range 2023-10-25 14:39:12 +01:00
John Spray
098ef0956b pageserver: publish disk eviction status 2023-10-25 14:35:32 +01:00
John Spray
127837abb0 tests: de-flake test_eviction_across_generations (#5650)
## Problem

There was an edge case where initial logical size calculation can be
downloading a layer that wasn't hit by the test's `SELECT`, and it's
on-disk but still marked as remote in the pageserver's internal state,
so evicting it fails.


https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5648/6630099807/index.html#categories/dee044ec96f666edb90a77c01099a941/e38e97a2735ffa8c/

## Summary of changes

Use pageserver API to learn about layers, instead of inspecting local
disk, so that we will always agree with the pageserver about which layer
are local.
2023-10-25 10:55:45 +01:00
Conrad Ludgate
b2c96047d0 move wake compute after the auth quirks logic (#5642)
## Problem

https://github.com/neondatabase/neon/issues/5568#issuecomment-1777015606

## Summary of changes

Make the auth_quirks_creds return the authentication information, and
push the wake_compute loop to after, inside `auth_quirks`
2023-10-25 08:30:47 +01:00
Em Sharnoff
44202eeb3b Bump vm-builder v0.18.1 -> v0.18.2 (#5646)
Only applicable change was neondatabase/autoscaling#571, removing the
postgres_exporter flags `--auto-discover-databases` and
`--exclude-databases=...`
2023-10-24 16:04:28 -07:00
Arpad Müller
4bef977c56 Use tuples instead of manual comparison chain (#5637)
Makes code a little bit simpler
2023-10-24 17:16:23 +00:00
John Spray
a0b862a8bd pageserver: schedule frozen layer uploads inside the layers lock (#5639)
## Problem

Compaction's source of truth for what layers exist is the LayerManager.
`flush_frozen_layer` updates LayerManager before it has scheduled upload
of the frozen layer.

Compaction can then "see" the new layer, decide to delete it, schedule
uploads of replacement layers, all before `flush_frozen_layer` wakes up
again and schedules the upload. When the upload is scheduled, the local
layer file may be gone, in which case we end up with no such layer in
remote storage, but an entry still added to IndexPart pointing to the
missing layer.

## Summary of changes

Schedule layer uploads inside the `self.layers` lock, so that whenever a
frozen layer is present in LayerManager, it is also present in
RemoteTimelineClient's metadata.

Closes: #5635
2023-10-24 13:57:01 +01:00
Conrad Ludgate
767ef29390 proxy: filter out more quota exceeded errors (#5640)
## Problem

Looking at logs, I saw more retries being performed for other quota
exceeded errors

## Summary of changes

Filter out all quota exceeded family of errors
2023-10-24 13:13:23 +01:00
Alexander Bayandin
a8a800af51 Run real Azure tests on CI (#5627)
## Problem
We do not run real Azure-related tests on CI 

## Summary of changes
- Set required env variables to run real Azure blob storage tests on CI
2023-10-24 12:12:11 +01:00
Arpad Müller
1e250cd90a Cleanup in azure_upload_download_works test (#5636)
The `azure_upload_download_works` test is not cleaning up after itself,
leaving behind the files it is uploading. I found these files when
looking at the contents of the bucket in #5627.

We now clean up the file we uploaded before, like the other tests do it
as well.

Follow-up of #5546
2023-10-23 19:08:56 +01:00
John Spray
eaaa18f6ed attachment_service: graceful SIGQUIT (#5626)
`attachment_service` doesn't explicitly handle signals, which causes a
backtrace when `neon_local` kills it with SIGQUIT.

Closes: https://github.com/neondatabase/neon/issues/5613
2023-10-23 17:30:25 +01:00
John Spray
188f67e1df pageserver: forward compat: be tolerant of deletion marker in timelines/ (#5632)
## Problem

https://github.com/neondatabase/neon/pull/5580 will move the remote
deletion marker into the `timelines/` path.

This would cause old pageserver code to fail loading the tenant due to
an apparently invalid timeline ID. That would be a problem if we had to
roll back after deploying #5580

## Summary of changes

If a `deleted` file is in `timelines/` just ignore it.
2023-10-23 17:51:38 +02:00
John Spray
7e805200bb pageserver: parallel load of configs (#5607)
## Problem

When the number of tenants is large, sequentially issuing the open/read
calls for their config files is a ~1000ms delay during startup. It's not
a lot, but it's simple to fix.

## Summary of changes

Put all the config loads into spawn_blocking() tasks and run them in a
JoinSet. We can simplify this a bit later when we have full async disk
I/O.

---------

Co-authored-by: Shany Pozin <shany@neon.tech>
2023-10-23 15:32:34 +01:00
Christian Schwarz
c6ca1d76d2 consumption_metrics: fix periodicness behavior & reporting (#5625)
Before this PR, the ticker was running at default miss behavior `Delay`.
For example, here is the startup output with 25k tenants:

```
2023-10-19T09:57:21.682466Z  INFO synthetic_size_worker: starting calculate_synthetic_size_worker
2023-10-19T10:50:44.678202Z  WARN synthetic_size_worker: task iteration took longer than the configured period elapsed=3202.995707156s period=10m task=ConsumptionMetricsSyntheticSizeWorker
2023-10-19T10:52:17.408056Z  WARN synthetic_size_worker: task iteration took longer than the configured period elapsed=2695.72556035s period=10m task=ConsumptionMetricsSyntheticSizeWorker
```

The first message's `elapsed` value is correct. It matches the
delta between the log line timestamps.

The second one is logged ca 1.5min after, though, but reports a much
larger
`elapsed` than 1.5min.

This PR fixes the behavior by copying what `eviction_task.rs` does.
2023-10-23 16:31:38 +02:00
Conrad Ludgate
94b4e76e13 proxy: latency connect outcome (#5588)
## Problem

I recently updated the latency timers to include cache miss and pool
miss, as well as connection protocol. By moving the latency timer to
start before authentication, we count a lot more failures and it's
messed up the latency dashboard.

## Summary of changes

Add another label to LatencyTimer metrics for outcome. Explicitly report
on success
2023-10-23 15:17:28 +01:00
khanova
b514da90cb Set up timeout for scram protocol execution (#5551)
## Problem
Context:
https://github.com/neondatabase/neon/issues/5511#issuecomment-1759649679

Some of out scram protocol execution timed out only after 17 minutes. 
## Summary of changes
Make timeout for scram execution meaningful and configurable.
2023-10-23 15:11:05 +01:00
Conrad Ludgate
7d17f1719f reduce cancel map contention (#5555)
## Problem

Every database request locks this cancel map rwlock. At high requests
per second this would have high contention

## Summary of changes

Switch to dashmap which has a sharded rwlock to reduce contention
2023-10-23 14:12:41 +01:00
John Spray
41ee75bc71 pageserver: do config writes in a spawn_blocking (#5603)
## Problem

We now persist tenant configuration every time we spawn a tenant. The
persist_tenant_config function is doing a series of non-async filesystem
I/O, because `crashsafe::` isn't async yet. This isn't a demonstrated
problem, but is a source of uncertainty when reasoning about what's
happening with our startup times.

## Summary of changes

- Wrap `crashsafe_overwrite` in `spawn_blocking`.
- Although I think this change makes sense, it does not have a
measurable impact on load time when testing with 10k tenants.
- This can be reverted when we have full async I/O
2023-10-23 09:19:01 +01:00
Christian Schwarz
11e523f503 walredo: fix EGAGAIN/"os error 11" false page reconstruction failures (#5560)
Stacked atop https://github.com/neondatabase/neon/pull/5559

Before this PR, there was the following race condition:

```
T1: polls for writeable stdin
T1: writes to stdin
T1: enters poll for stdout/stderr
T2: enters poll for stdin write
WALREDO: writes to stderr
KERNEL: wakes up T1 and T2
Tx: reads stderr and prints it
Ty: reads stderr and gets EAGAIN
(valid values for (x, y) are (1, 2) or (2, 1))
```

The concrete symptom that we observed repeatedly was with PG16,
which started logging `registered custom resource manager`
to stderr always, during startup, thereby giving us repeated
opportunity to hit above race condition. PG14 and PG15 didn't log
anything to stderr, hence we could have only hit this race condition
if there was an actual error happening.

This PR fixes the race by moving the reading of stderr into a tokio
task. It exits when the stderr is closed by the child process, which
in turn happens when the child exits, either by itself or because
we killed it.

The downside is that the async scheduling can reorder the log messages,
which can be seen in the new `test_stderr`, which runs in a
single-threaded runtime. I included the output below.

Overall I think we should move the entire walredo to async, as Joonas
proposed many months ago. This PR's asyncification is just the first
step to resolve these
false page reconstruction errors.

After this is fixed, we should stop printing that annoying stderr
message
on walredo startup; it causes noise in the pageserver logs.
That work is tracked in #5399 .

```
2023-10-13T19:05:21.878858Z ERROR apply_wal_records{tenant_id=d546fb76ba529195392fb4d19e243991 pid=753986}: failed to write out the walredo errored input: No such file or directory (os error 2) target=walredo-1697223921878-1132-0.walredo length=1132
2023-10-13T19:05:21.878932Z DEBUG postgres applied 2 WAL records (1062 bytes) in 114666 us to reconstruct page image at LSN 0/0
2023-10-13T19:05:21.878942Z ERROR error applying 2 WAL records 0/16A9388..0/16D4080 (1062 bytes) to base image with LSN 0/0 to reconstruct page image at LSN 0/0 n_attempts=0: apply_wal_records

Caused by:
    WAL redo process closed its stdout unexpectedly
2023-10-13T19:05:21.879027Z  INFO kill_and_wait_impl{pid=753986}: wait successful exit_status=signal: 11 (SIGSEGV) (core dumped)
2023-10-13T19:05:21.879079Z DEBUG wal-redo-postgres-stderr{pid=753986 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task started
2023-10-13T19:05:21.879104Z ERROR wal-redo-postgres-stderr{pid=753986 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: received output output="2023-10-13 19:05:21.769 GMT [753986] LOG:  registered custom resource manager \"neon\" with ID 134\n"
2023-10-13T19:05:21.879116Z DEBUG wal-redo-postgres-stderr{pid=753986 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task finished
2023-10-13T19:05:22.004439Z ERROR apply_wal_records{tenant_id=d546fb76ba529195392fb4d19e243991 pid=754000}: failed to write out the walredo errored input: No such file or directory (os error 2) target=walredo-1697223922004-1132-0.walredo length=1132
2023-10-13T19:05:22.004493Z DEBUG postgres applied 2 WAL records (1062 bytes) in 125344 us to reconstruct page image at LSN 0/0
2023-10-13T19:05:22.004501Z ERROR error applying 2 WAL records 0/16A9388..0/16D4080 (1062 bytes) to base image with LSN 0/0 to reconstruct page image at LSN 0/0 n_attempts=1: apply_wal_records

Caused by:
    WAL redo process closed its stdout unexpectedly
2023-10-13T19:05:22.004588Z  INFO kill_and_wait_impl{pid=754000}: wait successful exit_status=signal: 11 (SIGSEGV) (core dumped)
2023-10-13T19:05:22.004624Z DEBUG wal-redo-postgres-stderr{pid=754000 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task started
2023-10-13T19:05:22.004653Z ERROR wal-redo-postgres-stderr{pid=754000 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: received output output="2023-10-13 19:05:21.884 GMT [754000] LOG:  registered custom resource manager \"neon\" with ID 134\n"
2023-10-13T19:05:22.004666Z DEBUG wal-redo-postgres-stderr{pid=754000 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task finished
```
2023-10-23 09:00:13 +01:00
26 changed files with 683 additions and 374 deletions

View File

@@ -338,6 +338,16 @@ jobs:
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export REMOTE_STORAGE_AZURE_CONTAINER=neon-github-sandbox
export REMOTE_STORAGE_AZURE_REGION=eastus2
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
${cov_prefix} cargo test $CARGO_FLAGS --package remote_storage --test test_real_azure
- name: Install rust binaries
run: |
# Install target binaries
@@ -837,7 +847,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.18.1
VM_BUILDER_VERSION: v0.18.2
steps:
- name: Checkout

View File

@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{collections::HashMap, sync::Arc};
use utils::logging::{self, LogFormat};
use utils::signals::{ShutdownSignals, Signal};
use utils::{
http::{
@@ -268,7 +269,16 @@ async fn main() -> anyhow::Result<()> {
let server = hyper::Server::from_tcp(http_listener)?.serve(service);
tracing::info!("Serving on {0}", args.listen);
server.await?;
tokio::task::spawn(server);
ShutdownSignals::handle(|signal| match signal {
Signal::Interrupt | Signal::Terminate | Signal::Quit => {
tracing::info!("Got {}. Terminating", signal.name());
// We're just a test helper: no graceful shutdown.
std::process::exit(0);
}
})?;
Ok(())
}

View File

@@ -22,9 +22,9 @@ use postgres_ffi::Oid;
/// [See more related comments here](https:///github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/relfilenode.h#L57).
///
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
// Then we could replace the custo Ord and PartialOrd implementations below with
// deriving them.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
// Then we could replace the custom Ord and PartialOrd implementations below with
// deriving them. This will require changes in walredoproc.c.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)]
pub struct RelTag {
pub forknum: u8,
pub spcnode: Oid,
@@ -40,21 +40,9 @@ impl PartialOrd for RelTag {
impl Ord for RelTag {
fn cmp(&self, other: &Self) -> Ordering {
let mut cmp = self.spcnode.cmp(&other.spcnode);
if cmp != Ordering::Equal {
return cmp;
}
cmp = self.dbnode.cmp(&other.dbnode);
if cmp != Ordering::Equal {
return cmp;
}
cmp = self.relnode.cmp(&other.relnode);
if cmp != Ordering::Equal {
return cmp;
}
cmp = self.forknum.cmp(&other.forknum);
cmp
// Custom ordering where we put forknum to the end of the list
let other_tup = (other.spcnode, other.dbnode, other.relnode, other.forknum);
(self.spcnode, self.dbnode, self.relnode, self.forknum).cmp(&other_tup)
}
}

View File

@@ -267,6 +267,12 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
let buf = download_and_compare(dl).await?;
assert_eq!(buf, data);
debug!("Cleanup: deleting file at path {path:?}");
ctx.client
.delete(&path)
.await
.with_context(|| format!("{path:?} removal"))?;
Ok(())
}

View File

@@ -1479,6 +1479,8 @@ threshold = "20m"
Some(DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 0,
target_avail_bytes: None,
target_usage_pct: None,
period: Duration::from_secs(10),
#[cfg(feature = "testing")]
mock_statvfs: None,

View File

@@ -11,6 +11,7 @@ use reqwest::Url;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::time::Instant;
use tracing::*;
use utils::id::NodeId;
@@ -88,22 +89,12 @@ pub async fn collect_metrics(
let node_id = node_id.to_string();
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
let started_at = Instant::now();
// these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
if metrics.is_empty() {
continue;
}
let metrics = Arc::new(metrics);
// why not race cancellation here? because we are one of the last tasks, and if we are
@@ -142,10 +133,19 @@ pub async fn collect_metrics(
let (_, _) = tokio::join!(flush, upload);
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
started_at.elapsed(),
metric_collection_interval,
BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
);
let res = tokio::time::timeout_at(
started_at + metric_collection_interval,
task_mgr::shutdown_token().cancelled(),
)
.await;
if res.is_ok() {
return Ok(());
}
}
}
@@ -244,16 +244,14 @@ async fn calculate_synthetic_size_worker(
ctx: &RequestContext,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
scopeguard::defer! {
info!("calculate_synthetic_size_worker stopped");
};
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
loop {
let tick_at = tokio::select! {
_ = task_mgr::shutdown_watcher() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
let started_at = Instant::now();
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
@@ -281,9 +279,18 @@ async fn calculate_synthetic_size_worker(
}
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
started_at.elapsed(),
synthetic_size_calculation_interval,
BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
);
let res = tokio::time::timeout_at(
started_at + synthetic_size_calculation_interval,
task_mgr::shutdown_token().cancelled(),
)
.await;
if res.is_ok() {
return Ok(());
}
}
}

View File

@@ -67,16 +67,40 @@ use crate::{
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: Percent,
pub min_avail_bytes: u64,
// Control how far we will go when evicting: when usage exceeds max_usage_pct or min_avail_bytes,
// we will keep evicting layers until we reach the target. The resulting disk usage should look
// like a sawtooth bouncing between the upper max/min line and the lower target line.
#[serde(default)]
pub target_usage_pct: Option<Percent>,
#[serde(default)]
pub target_avail_bytes: Option<u64>,
#[serde(with = "humantime_serde")]
pub period: Duration,
#[cfg(feature = "testing")]
pub mock_statvfs: Option<crate::statvfs::mock::Behavior>,
}
#[derive(Default)]
enum Status {
/// We are within disk limits, and not currently doing any eviction
#[default]
Idle,
/// Disk limits have been exceeded: we will evict soon
UnderPressure,
/// We are currently doing an eviction pass.
Evicting,
}
#[derive(Default)]
pub struct State {
/// Exclude http requests and background task from running at the same time.
mutex: tokio::sync::Mutex<()>,
/// Publish the current status of eviction work, for visibility to other subsystems
/// that modify their behavior if disk pressure is high or if eviction is going on.
status: std::sync::RwLock<Status>,
}
pub fn launch_disk_usage_global_eviction_task(
@@ -176,7 +200,9 @@ async fn disk_usage_eviction_task(
}
pub trait Usage: Clone + Copy + std::fmt::Debug {
fn has_pressure(&self) -> bool;
fn pressure(&self) -> f64;
fn over_pressure(&self) -> bool;
fn no_pressure(&self) -> bool;
fn add_available_bytes(&mut self, bytes: u64);
}
@@ -189,13 +215,19 @@ async fn disk_usage_eviction_task_iteration(
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
if usage_pre.over_pressure() {
*state.status.write().unwrap() = Status::Evicting;
}
let res = disk_usage_eviction_task_iteration_impl(state, storage, usage_pre, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
match outcome {
let new_status = match outcome {
IterationOutcome::NoPressure | IterationOutcome::Cancelled => {
// nothing to do, select statement below will handle things
Status::Idle
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
@@ -205,21 +237,30 @@ async fn disk_usage_eviction_task_iteration(
debug!(?after, "disk usage");
if after.has_pressure() {
if after.over_pressure() {
// Don't bother doing an out-of-order iteration here now.
// In practice, the task period is set to a value in the tens-of-seconds range,
// which will cause another iteration to happen soon enough.
// TODO: deltas between the three different usages would be helpful,
// consider MiB, GiB, TiB
warn!(?outcome, ?after, "disk usage still high");
Status::UnderPressure
} else {
info!(?outcome, ?after, "disk usage pressure relieved");
Status::Idle
}
}
}
};
*state.status.write().unwrap() = new_status;
}
Err(e) => {
error!("disk_usage_eviction_iteration failed: {:#}", e);
*state.status.write().unwrap() = if usage_pre.over_pressure() {
Status::UnderPressure
} else {
Status::Idle
};
}
}
@@ -285,8 +326,10 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
debug!(?usage_pre, "disk usage");
if !usage_pre.has_pressure() {
if !usage_pre.over_pressure() {
return Ok(IterationOutcome::NoPressure);
} else {
*state.status.write().unwrap() = Status::Evicting;
}
warn!(
@@ -334,7 +377,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
let mut warned = None;
let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
if !usage_planned.has_pressure() {
if usage_planned.no_pressure() {
debug!(
no_candidates_evicted = i,
"took enough candidates for pressure to be relieved"
@@ -644,22 +687,57 @@ mod filesystem_level_usage {
}
impl super::Usage for Usage<'_> {
fn has_pressure(&self) -> bool {
let usage_pct =
(100.0 * (1.0 - ((self.avail_bytes as f64) / (self.total_bytes as f64)))) as u64;
/// Does the pressure exceed 1.0, i.e. has the disk usage exceeded upper bounds?
///
/// This is the condition for starting eviction.
fn over_pressure(&self) -> bool {
self.pressure() >= 1.0
}
let pressures = [
(
"min_avail_bytes",
self.avail_bytes < self.config.min_avail_bytes,
),
(
"max_usage_pct",
usage_pct >= self.config.max_usage_pct.get() as u64,
),
];
/// Is the pressure <0, ie.. has disk usage gone below the target bound?
///
/// This is the condition for dropping out of eviction.
fn no_pressure(&self) -> bool {
self.pressure() <= 0.0
}
pressures.into_iter().any(|(_, has_pressure)| has_pressure)
fn pressure(&self) -> f64 {
let max_usage = std::cmp::min(
self.total_bytes - self.config.min_avail_bytes,
(self.total_bytes as f64 * (self.config.max_usage_pct.get() as f64 / 100.0)) as u64,
);
let mut target_usage = max_usage;
if let Some(target_avail_bytes) = self.config.target_avail_bytes {
target_usage = std::cmp::min(target_usage, self.total_bytes - target_avail_bytes);
}
if let Some(target_usage_pct) = self.config.target_usage_pct {
target_usage = std::cmp::min(
target_usage,
(self.total_bytes as f64 * (target_usage_pct.get() as f64 / 100.0)) as u64,
);
};
let usage = self.total_bytes - self.avail_bytes;
eprintln!(
"pressure: {} {}, current {}",
target_usage, max_usage, usage
);
if target_usage == max_usage {
// We are configured with a zero sized range: treat anything at+beyond limit as pressure 1.0, else 0.0
if usage >= max_usage {
1.0
} else {
0.0
}
} else if usage <= target_usage {
// No pressure.
0.0
} else {
// We are above target: pressure is the ratio of how much we exceed target to the size of the gap
let range_size = (max_usage - target_usage) as f64;
(usage - target_usage) as f64 / range_size
}
}
fn add_available_bytes(&mut self, bytes: u64) {
@@ -713,6 +791,8 @@ mod filesystem_level_usage {
config: &DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(85).unwrap(),
min_avail_bytes: 0,
target_avail_bytes: None,
target_usage_pct: None,
period: Duration::MAX,
#[cfg(feature = "testing")]
mock_statvfs: None,
@@ -721,24 +801,24 @@ mod filesystem_level_usage {
avail_bytes: 0,
};
assert!(usage.has_pressure(), "expected pressure at 100%");
assert!(usage.over_pressure(), "expected pressure at 100%");
usage.add_available_bytes(14_000);
assert!(usage.has_pressure(), "expected pressure at 86%");
assert!(usage.over_pressure(), "expected pressure at 86%");
usage.add_available_bytes(999);
assert!(usage.has_pressure(), "expected pressure at 85.001%");
assert!(usage.over_pressure(), "expected pressure at 85.001%");
usage.add_available_bytes(1);
assert!(usage.has_pressure(), "expected pressure at precisely 85%");
assert!(usage.over_pressure(), "expected pressure at precisely 85%");
usage.add_available_bytes(1);
assert!(!usage.has_pressure(), "no pressure at 84.999%");
assert!(!usage.over_pressure(), "no pressure at 84.999%");
usage.add_available_bytes(999);
assert!(!usage.has_pressure(), "no pressure at 84%");
assert!(!usage.over_pressure(), "no pressure at 84%");
usage.add_available_bytes(16_000);
assert!(!usage.has_pressure());
assert!(!usage.over_pressure());
}
}

View File

@@ -1452,10 +1452,22 @@ async fn disk_usage_eviction_run(
}
impl crate::disk_usage_eviction_task::Usage for Usage {
fn has_pressure(&self) -> bool {
fn over_pressure(&self) -> bool {
self.config.evict_bytes > self.freed_bytes
}
fn no_pressure(&self) -> bool {
!self.over_pressure()
}
fn pressure(&self) -> f64 {
if self.over_pressure() {
1.0
} else {
0.0
}
}
fn add_available_bytes(&mut self, bytes: u64) {
self.freed_bytes += bytes;
}

View File

@@ -18,6 +18,7 @@ use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -2614,6 +2615,7 @@ impl Tenant {
) -> anyhow::Result<()> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
.await
}
@@ -2652,12 +2654,20 @@ impl Tenant {
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
let conf_content = conf_content.as_bytes();
let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
VirtualFile::crashsafe_overwrite(config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {config_path}"))?;
let tenant_id = *tenant_id;
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {config_path}"))
})
})
.await??;
Ok(())
}
@@ -2679,12 +2689,21 @@ impl Tenant {
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
let conf_content = conf_content.as_bytes();
let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {target_config_path}"))?;
let tenant_id = *tenant_id;
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
format!("write tenant {tenant_id} config to {target_config_path}")
})
})
})
.await??;
Ok(())
}
@@ -3668,17 +3687,21 @@ pub(crate) mod harness {
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
pub(crate) fn setup_logging() {
LOG_HANDLE.get_or_init(|| {
logging::init(
logging::LogFormat::Test,
// enable it in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
});
}
impl TenantHarness {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
LOG_HANDLE.get_or_init(|| {
logging::init(
logging::LogFormat::Test,
// enable it in case in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
});
setup_logging();
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);

View File

@@ -1,7 +1,7 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use camino::{Utf8Path, Utf8PathBuf};
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use rand::{distributions::Alphanumeric, Rng};
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
@@ -256,83 +256,99 @@ async fn init_load_generations(
Ok(Some(generations))
}
/// Given a directory discovered in the pageserver's tenants/ directory, attempt
/// to load a tenant config from it.
///
/// If file is missing, return Ok(None)
fn load_tenant_config(
conf: &'static PageServerConf,
dentry: Utf8DirEntry,
) -> anyhow::Result<Option<(TenantId, anyhow::Result<LocationConf>)>> {
let tenant_dir_path = dentry.path().to_path_buf();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
// No need to use safe_remove_tenant_dir_all because this is already
// a temporary path
if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path, e
);
}
return Ok(None);
}
// This case happens if we crash during attachment before writing a config into the dir
let is_empty = tenant_dir_path
.is_empty_dir()
.with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
return Ok(None);
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
return Ok(None);
}
let tenant_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
return Ok(None);
}
};
Ok(Some((
tenant_id,
Tenant::load_tenant_config(conf, &tenant_id),
)))
}
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
/// and load configurations for the tenants we found.
///
/// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
/// seconds even on reasonably fast drives.
async fn init_load_tenant_configs(
conf: &'static PageServerConf,
) -> anyhow::Result<HashMap<TenantId, anyhow::Result<LocationConf>>> {
let tenants_dir = conf.tenants_path();
let mut dir_entries = tenants_dir
.read_dir_utf8()
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
let dir_entries = tenants_dir
.read_dir_utf8()
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
})
.await??;
let mut configs = HashMap::new();
loop {
match dir_entries.next() {
None => break,
Some(Ok(dentry)) => {
let tenant_dir_path = dentry.path().to_path_buf();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
// No need to use safe_remove_tenant_dir_all because this is already
// a temporary path
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path, e
);
}
continue;
}
let mut join_set = JoinSet::new();
for dentry in dentries {
join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
}
// This case happens if we:
// * crash during attach before creating the attach marker file
// * crash during tenant delete before removing tenant directory
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
})?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
continue;
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
continue;
}
let tenant_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",
);
continue;
}
};
configs.insert(tenant_id, Tenant::load_tenant_config(conf, &tenant_id));
}
Some(Err(e)) => {
// An error listing the top level directory indicates serious problem
// with local filesystem: we will fail to load, and fail to start.
anyhow::bail!(e);
}
while let Some(r) = join_set.join_next().await {
if let Some((tenant_id, tenant_config)) = r?? {
configs.insert(tenant_id, tenant_config);
}
}
Ok(configs)
}

View File

@@ -18,7 +18,7 @@ use crate::config::PageServerConf;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::Generation;
use crate::tenant::{Generation, TENANT_DELETED_MARKER_FILE_NAME};
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
@@ -190,6 +190,12 @@ pub async fn list_remote_timelines(
let mut timeline_ids = HashSet::new();
for timeline_remote_storage_key in timelines {
if timeline_remote_storage_key.object_name() == Some(TENANT_DELETED_MARKER_FILE_NAME) {
// A `deleted` key within `timelines/` is a marker file, not a timeline. Ignore it.
// This code will be removed in https://github.com/neondatabase/neon/pull/5580
continue;
}
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
})?;

View File

@@ -60,6 +60,8 @@ pub(super) async fn upload_timeline_layer<'a>(
bail!("failpoint before-upload-layer")
});
pausable_failpoint!("before-upload-layer-pausable");
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res {

View File

@@ -2793,10 +2793,13 @@ impl Timeline {
)
};
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let metadata = {
let mut guard = self.layers.write().await;
if let Some(ref l) = delta_layer_to_add {
@@ -2812,8 +2815,17 @@ impl Timeline {
}
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.disk_consistent_lsn.store(disk_consistent_lsn);
// Schedule remote uploads that will reflect our new disk_consistent_lsn
Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?)
} else {
None
}
// release lock on 'layers'
}
};
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
@@ -2829,28 +2841,22 @@ impl Timeline {
//
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
// *all* the layers, to avoid fsyncing the file multiple times.
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
if disk_consistent_lsn != old_disk_consistent_lsn {
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
if let Some(metadata) = metadata {
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("update_metadata_file")?;
// Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn);
.context("save_metadata")?;
}
Ok(())
}
/// Update metadata file
async fn update_metadata_file(
fn schedule_uploads(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
) -> anyhow::Result<()> {
) -> anyhow::Result<TimelineMetadata> {
// We can only save a valid 'prev_record_lsn' value on disk if we
// flushed *all* in-memory changes to disk. We only track
// 'prev_record_lsn' in memory for the latest processed record, so we
@@ -2887,10 +2893,6 @@ impl Timeline {
x.unwrap()
));
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("save_metadata")?;
if let Some(remote_client) = &self.remote_client {
for (path, layer_metadata) in layer_paths_to_upload {
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
@@ -2898,6 +2900,20 @@ impl Timeline {
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
}
Ok(metadata)
}
async fn update_metadata_file(
&self,
disk_consistent_lsn: Lsn,
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
) -> anyhow::Result<()> {
let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?;
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
.await
.context("save_metadata")?;
Ok(())
}

View File

@@ -27,13 +27,14 @@ use std::collections::VecDeque;
use std::io;
use std::io::prelude::*;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
@@ -47,7 +48,6 @@ use crate::metrics::{
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::walrecord::NeonWalRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
@@ -72,8 +72,6 @@ pub(crate) struct BufferTag {
struct ProcessInput {
stdin: ChildStdin,
stderr_fd: RawFd,
stdout_fd: RawFd,
n_requests: usize,
}
@@ -121,6 +119,7 @@ impl PostgresRedoManager {
/// The WAL redo is handled by a separate thread, so this just sends a request
/// to the thread and waits for response.
///
/// CANCEL SAFETY: NOT CANCEL SAFE.
pub async fn request_redo(
&self,
key: Key,
@@ -153,6 +152,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -173,6 +173,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
@@ -194,7 +195,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -283,19 +284,20 @@ impl PostgresRedoManager {
);
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
{
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
drop(guard);
// NB: there may still be other concurrent threads using `proc`.
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
@@ -308,7 +310,12 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
let mut wait_done = proc.stderr_logger_task_done.clone();
drop(proc);
wait_done
.wait_for(|v| *v)
.await
.expect("we use scopeguard to ensure we always send `true` to the channel before dropping the sender");
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
@@ -619,7 +626,8 @@ struct WalRedoProcess {
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stderr: Mutex<ChildStderr>,
stderr_logger_cancel: CancellationToken,
stderr_logger_task_done: tokio::sync::watch::Receiver<bool>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -668,7 +676,6 @@ impl WalRedoProcess {
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
@@ -682,16 +689,73 @@ impl WalRedoProcess {
set_nonblock_or_log_err!(stdout)?;
set_nonblock_or_log_err!(stderr)?;
let mut stderr = tokio::io::unix::AsyncFd::new(stderr).context("AsyncFd::with_interest")?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let stderr_logger_cancel = CancellationToken::new();
let (stderr_logger_task_done_tx, stderr_logger_task_done_rx) =
tokio::sync::watch::channel(false);
tokio::spawn({
let stderr_logger_cancel = stderr_logger_cancel.clone();
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
let _ = stderr_logger_task_done_tx.send(true);
}
debug!("wal-redo-postgres stderr_logger_task started");
loop {
// NB: we purposefully don't do a select! for the cancellation here.
// The cancellation would likely cause us to miss stderr messages.
// We can rely on this to return from .await because when we SIGKILL
// the child, the writing end of the stderr pipe gets closed.
match stderr.readable_mut().await {
Ok(mut guard) => {
let mut errbuf = [0; 16384];
let res = guard.try_io(|fd| {
use std::io::Read;
fd.get_mut().read(&mut errbuf)
});
match res {
Ok(Ok(0)) => {
// it closed the stderr pipe
break;
}
Ok(Ok(n)) => {
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
let output = String::from_utf8_lossy(&errbuf[0..n]).to_string();
error!(output, "received output");
},
Ok(Err(e)) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
}
Err(e) => {
let _e: tokio::io::unix::TryIoError = e;
// the read() returned WouldBlock, that's expected
}
}
}
Err(e) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
}
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version))
});
Ok(Self {
conf,
tenant_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
stdin,
n_requests: 0,
}),
@@ -700,7 +764,8 @@ impl WalRedoProcess {
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
stderr: Mutex::new(stderr),
stderr_logger_cancel,
stderr_logger_task_done: stderr_logger_task_done_rx,
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
@@ -774,19 +839,11 @@ impl WalRedoProcess {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
];
let mut stdin_pollfds = [PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT)];
// We do two things simultaneously: send the old base image and WAL records to
// the child process's stdin and forward any logging
// information that the child writes to its stderr to the page server's log.
while nwrite < writebuf.len() {
let n = loop {
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
@@ -796,31 +853,8 @@ impl WalRedoProcess {
anyhow::bail!("WAL redo timed out");
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr = self.stderr.lock().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stderr unexpectedly");
}
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
@@ -845,6 +879,7 @@ impl WalRedoProcess {
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let mut stdout_pollfds = [PollFd::new(output.stdout.as_raw_fd(), PollFlags::POLLIN)];
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
@@ -855,7 +890,10 @@ impl WalRedoProcess {
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
@@ -865,31 +903,8 @@ impl WalRedoProcess {
anyhow::bail!("WAL redo timed out");
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr = self.stderr.lock().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stderr unexpectedly");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
@@ -985,6 +1000,8 @@ impl Drop for WalRedoProcess {
.take()
.expect("we only do this once")
.kill_and_wait();
self.stderr_logger_cancel.cancel();
// no way to wait for stderr_logger_task from Drop because that is async only
}
}
@@ -1066,7 +1083,7 @@ impl Drop for NoLeakChild {
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
BACKGROUND_RUNTIME.spawn(async move {
tokio::runtime::Handle::current().spawn(async move {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
@@ -1199,6 +1216,22 @@ mod tests {
assert_eq!(page, crate::ZERO_PAGE);
}
#[tokio::test]
async fn test_stderr() {
let h = RedoHarness::new().unwrap();
h
.manager
.request_redo(
Key::from_i128(0),
Lsn::INVALID,
None,
short_records(),
16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
)
.await
.unwrap_err();
}
#[allow(clippy::octal_escapes)]
fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
vec![
@@ -1227,6 +1260,8 @@ mod tests {
impl RedoHarness {
fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));

View File

@@ -3,9 +3,12 @@ mod hacks;
mod link;
pub use link::LinkAuthError;
use tokio_postgres::config::AuthKeys;
use crate::proxy::{handle_try_wake, retry_after};
use crate::{
auth::{self, ClientCredentials},
config::AuthenticationConfig,
console::{
self,
provider::{CachedNodeInfo, ConsoleReqExtra},
@@ -15,8 +18,9 @@ use crate::{
};
use futures::TryFutureExt;
use std::borrow::Cow;
use std::ops::ControlFlow;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use tracing::{error, info, warn};
/// A product of successful authentication.
pub struct AuthSuccess<T> {
@@ -116,21 +120,27 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
}
}
pub enum ComputeCredentials {
Password(Vec<u8>),
AuthKeys(AuthKeys),
}
/// True to its name, this function encapsulates our current auth trade-offs.
/// Here, we choose the appropriate auth flow based on circumstances.
async fn auth_quirks(
async fn auth_quirks_creds(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
config: &'static AuthenticationConfig,
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
// If there's no project so far, that entails that client doesn't
// support SNI or other means of passing the endpoint (project) name.
// We now expect to see a very specific payload in the place of password.
if creds.project.is_none() {
// Password will be checked by the compute node later.
return hacks::password_hack(api, extra, creds, client).await;
return hacks::password_hack(creds, client).await;
}
// Password hack should set the project name.
@@ -141,11 +151,53 @@ async fn auth_quirks(
// Currently, we use it for websocket connections (latency).
if allow_cleartext {
// Password will be checked by the compute node later.
return hacks::cleartext_hack(api, extra, creds, client).await;
return hacks::cleartext_hack(client).await;
}
// Finally, proceed with the main auth flow (SCRAM-based).
classic::authenticate(api, extra, creds, client).await
classic::authenticate(api, extra, creds, client, config).await
}
/// True to its name, this function encapsulates our current auth trade-offs.
/// Here, we choose the appropriate auth flow based on circumstances.
async fn auth_quirks(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
let auth_stuff = auth_quirks_creds(api, extra, creds, client, allow_cleartext, config).await?;
let mut num_retries = 0;
let mut node = loop {
let wake_res = api.wake_compute(extra, creds).await;
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
return Err(e.into());
}
Ok(ControlFlow::Continue(e)) => {
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
Ok(ControlFlow::Break(n)) => break n,
}
let wait_duration = retry_after(num_retries);
num_retries += 1;
tokio::time::sleep(wait_duration).await;
};
match auth_stuff.value {
ComputeCredentials::Password(password) => node.config.password(password),
ComputeCredentials::AuthKeys(auth_keys) => node.config.auth_keys(auth_keys),
};
Ok(AuthSuccess {
reported_auth_ok: auth_stuff.reported_auth_ok,
value: node,
})
}
impl BackendType<'_, ClientCredentials<'_>> {
@@ -180,6 +232,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
extra: &ConsoleReqExtra<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
use BackendType::*;
@@ -192,7 +245,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
);
let api = api.as_ref();
auth_quirks(api, extra, creds, client, allow_cleartext).await?
auth_quirks(api, extra, creds, client, allow_cleartext, config).await?
}
Postgres(api, creds) => {
info!(
@@ -202,7 +255,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
);
let api = api.as_ref();
auth_quirks(api, extra, creds, client, allow_cleartext).await?
auth_quirks(api, extra, creds, client, allow_cleartext, config).await?
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => {

View File

@@ -1,23 +1,22 @@
use std::ops::ControlFlow;
use super::AuthSuccess;
use super::{AuthSuccess, ComputeCredentials};
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
proxy::{handle_try_wake, retry_after},
config::AuthenticationConfig,
console::{self, AuthInfo, ConsoleReqExtra},
sasl, scram,
stream::PqStream,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, warn};
use tracing::{info, warn};
pub(super) async fn authenticate(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
config: &'static AuthenticationConfig,
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
info!("fetching user's authentication info");
let info = api.get_auth_info(extra, creds).await?.unwrap_or_else(|| {
// If we don't have an authentication secret, we mock one to
@@ -42,7 +41,16 @@ pub(super) async fn authenticate(
error
})?;
let auth_outcome = auth_flow.authenticate().await.map_err(|error| {
let auth_outcome = tokio::time::timeout(
config.scram_protocol_timeout,
auth_flow.authenticate(),
)
.await
.map_err(|error| {
warn!("error processing scram messages error = authentication timed out, execution time exeeded {} seconds", config.scram_protocol_timeout.as_secs());
auth::io::Error::new(auth::io::ErrorKind::TimedOut, error)
})?
.map_err(|error| {
warn!(?error, "error processing scram messages");
error
})?;
@@ -55,38 +63,17 @@ pub(super) async fn authenticate(
}
};
Some(compute::ScramKeys {
compute::ScramKeys {
client_key: client_key.as_bytes(),
server_key: secret.server_key.as_bytes(),
})
}
}
};
let mut num_retries = 0;
let mut node = loop {
let wake_res = api.wake_compute(extra, creds).await;
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
return Err(e.into());
}
Ok(ControlFlow::Continue(e)) => {
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
Ok(ControlFlow::Break(n)) => break n,
}
let wait_duration = retry_after(num_retries);
num_retries += 1;
tokio::time::sleep(wait_duration).await;
};
if let Some(keys) = scram_keys {
use tokio_postgres::config::AuthKeys;
node.config.auth_keys(AuthKeys::ScramSha256(keys));
}
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
value: ComputeCredentials::AuthKeys(tokio_postgres::config::AuthKeys::ScramSha256(
scram_keys,
)),
})
}

View File

@@ -1,10 +1,6 @@
use super::AuthSuccess;
use super::{AuthSuccess, ComputeCredentials};
use crate::{
auth::{self, AuthFlow, ClientCredentials},
console::{
self,
provider::{CachedNodeInfo, ConsoleReqExtra},
},
stream,
};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -15,11 +11,8 @@ use tracing::{info, warn};
/// These properties are benefical for serverless JS workers, so we
/// use this mechanism for websocket connections.
pub async fn cleartext_hack(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
warn!("cleartext auth flow override is enabled, proceeding");
let password = AuthFlow::new(client)
.begin(auth::CleartextPassword)
@@ -27,24 +20,19 @@ pub async fn cleartext_hack(
.authenticate()
.await?;
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(password);
// Report tentative success; compute node will check the password anyway.
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
value: ComputeCredentials::Password(password),
})
}
/// Workaround for clients which don't provide an endpoint (project) name.
/// Very similar to [`cleartext_hack`], but there's a specific password format.
pub async fn password_hack(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeCredentials>> {
warn!("project not specified, resorting to the password hack auth flow");
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)
@@ -55,12 +43,9 @@ pub async fn password_hack(
info!(project = &payload.endpoint, "received missing parameter");
creds.project = Some(payload.endpoint);
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);
// Report tentative success; compute node will check the password anyway.
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
value: ComputeCredentials::Password(payload.password),
})
}

View File

@@ -1,5 +1,6 @@
use futures::future::Either;
use proxy::auth;
use proxy::config::AuthenticationConfig;
use proxy::config::HttpConfig;
use proxy::console;
use proxy::http;
@@ -83,7 +84,9 @@ struct ProxyCliArgs {
/// timeout for http connections
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
/// timeout for scram authentication protocol
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
scram_protocol_timeout: tokio::time::Duration,
/// Require that all incoming requests have a Proxy Protocol V2 packet **and** have an IP address associated.
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
require_client_ip: bool,
@@ -231,12 +234,16 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let http_config = HttpConfig {
sql_over_http_timeout: args.sql_over_http_timeout,
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
require_client_ip: args.require_client_ip,
}));

View File

@@ -1,5 +1,5 @@
use anyhow::{anyhow, Context};
use hashbrown::HashMap;
use anyhow::{bail, Context};
use dashmap::DashMap;
use pq_proto::CancelKeyData;
use std::net::SocketAddr;
use tokio::net::TcpStream;
@@ -8,7 +8,7 @@ use tracing::info;
/// Enables serving `CancelRequest`s.
#[derive(Default)]
pub struct CancelMap(parking_lot::RwLock<HashMap<CancelKeyData, Option<CancelClosure>>>);
pub struct CancelMap(DashMap<CancelKeyData, Option<CancelClosure>>);
impl CancelMap {
/// Cancel a running query for the corresponding connection.
@@ -16,7 +16,6 @@ impl CancelMap {
// NB: we should immediately release the lock after cloning the token.
let cancel_closure = self
.0
.read()
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("query cancellation key not found: {key}"))?;
@@ -40,15 +39,19 @@ impl CancelMap {
// Random key collisions are unlikely to happen here, but they're still possible,
// which is why we have to take care not to rewrite an existing key.
self.0
.write()
.try_insert(key, None)
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
match self.0.entry(key) {
dashmap::mapref::entry::Entry::Occupied(_) => {
bail!("query cancellation key already exists: {key}")
}
dashmap::mapref::entry::Entry::Vacant(e) => {
e.insert(None);
}
}
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
self.0.write().remove(&key);
self.0.remove(&key);
info!("dropped query cancellation key {key}");
}
@@ -59,12 +62,12 @@ impl CancelMap {
#[cfg(test)]
fn contains(&self, session: &Session) -> bool {
self.0.read().contains_key(&session.key)
self.0.contains_key(&session.key)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.0.read().is_empty()
self.0.is_empty()
}
}
@@ -113,10 +116,7 @@ impl Session<'_> {
/// This enables query cancellation in `crate::proxy::prepare_client_connection`.
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancel_map
.0
.write()
.insert(self.key, Some(cancel_closure));
self.cancel_map.0.insert(self.key, Some(cancel_closure));
self.key
}

View File

@@ -14,6 +14,7 @@ pub struct ProxyConfig {
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
}
@@ -32,6 +33,10 @@ pub struct HttpConfig {
pub sql_over_http_timeout: tokio::time::Duration,
}
pub struct AuthenticationConfig {
pub scram_protocol_timeout: tokio::time::Duration,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()

View File

@@ -90,7 +90,11 @@ pub mod errors {
status: http::StatusCode::LOCKED,
ref text,
} => {
!text.contains("written data quota exceeded")
// written data quota exceeded
// data transfer quota exceeded
// compute time quota exceeded
// logical size quota exceeded
!text.contains("quota exceeded")
&& !text.contains("the limit for current plan reached")
}
// retry server errors

View File

@@ -194,9 +194,10 @@ impl GlobalConnPool {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await
} else {
latency_timer.pool_hit();
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client {
inner: Some(client),
span: Span::current(),

View File

@@ -5,7 +5,7 @@ use crate::{
auth::{self, backend::AuthSuccess},
cancellation::{self, CancelMap},
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
config::{AuthenticationConfig, ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
@@ -96,7 +96,9 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_latency_seconds",
"Time it took for proxy to establish a connection to the compute endpoint",
&["protocol", "cache_miss", "pool_miss"],
// http/ws/tcp, true/false, true/false, success/failure
// 3 * 2 * 2 * 2 = 24 counters
&["protocol", "cache_miss", "pool_miss", "outcome"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
@@ -105,19 +107,22 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
pub struct LatencyTimer {
start: Instant,
pool_miss: bool,
cache_miss: bool,
protocol: &'static str,
cache_miss: bool,
pool_miss: bool,
outcome: &'static str,
}
impl LatencyTimer {
pub fn new(protocol: &'static str) -> Self {
Self {
start: Instant::now(),
protocol,
cache_miss: false,
// by default we don't do pooling
pool_miss: true,
protocol,
// assume failed unless otherwise specified
outcome: "failed",
}
}
@@ -128,6 +133,10 @@ impl LatencyTimer {
pub fn pool_hit(&mut self) {
self.pool_miss = false;
}
pub fn success(mut self) {
self.outcome = "success";
}
}
impl Drop for LatencyTimer {
@@ -138,6 +147,7 @@ impl Drop for LatencyTimer {
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration)
}
@@ -340,7 +350,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
mode.allow_self_signed_compute(config),
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode))
.with_session(|session| client.connect_to_db(session, mode, &config.authentication_config))
.await
}
@@ -547,7 +557,10 @@ where
// try once
let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
Ok(res) => return Ok(res),
Ok(res) => {
latency_timer.success();
return Ok(res);
}
Err(e) => {
error!(error = ?e, "could not connect to compute node");
(invalidate_cache(node_info), e)
@@ -601,7 +614,10 @@ where
info!("wake_compute success. attempting to connect");
loop {
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
Ok(res) => return Ok(res),
Ok(res) => {
latency_timer.success();
return Ok(res);
}
Err(e) => {
let retriable = e.should_retry(num_retries);
if !retriable {
@@ -818,6 +834,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
self,
session: cancellation::Session<'_>,
mode: ClientMode,
config: &'static AuthenticationConfig,
) -> anyhow::Result<()> {
let Self {
mut stream,
@@ -835,7 +852,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
let latency_timer = LatencyTimer::new(mode.protocol_label());
let auth_result = match creds
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.authenticate(&extra, &mut stream, mode.allow_cleartext(), config)
.await
{
Ok(auth_result) => auth_result,

View File

@@ -1631,7 +1631,7 @@ class NeonPageserver(PgProtocol):
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
'.*registered custom resource manager "neon".*',
'.*registered custom resource manager \\\\"neon\\\\".*',
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
# and it is not a failure of our code when it happens.
".*DeleteObjects.*We encountered an internal error. Please try again.*",

View File

@@ -486,16 +486,20 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
def evict_all_layers(env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
client = env.pageserver.http_client()
for layer in initial_local_layers:
if "ephemeral" in layer.name or "temp" in layer.name:
layer_map = client.layer_map_info(tenant_id, timeline_id)
for layer in layer_map.historic_layers:
if layer.remote:
log.info(
f"Skipping trying to evict remote layer {tenant_id}/{timeline_id} {layer.layer_file_name}"
)
continue
log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.name}")
client.evict_layer(tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.name)
log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.layer_file_name}")
client.evict_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.layer_file_name
)
def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):

View File

@@ -757,12 +757,14 @@ def test_empty_branch_remote_storage_upload_on_restart(neon_env_builder: NeonEnv
create_thread.join()
# Regression test for a race condition where L0 layers are compacted before the upload,
# resulting in the uploading complaining about the file not being found
# https://github.com/neondatabase/neon/issues/4526
def test_compaction_delete_before_upload(
def test_compaction_waits_for_upload(
neon_env_builder: NeonEnvBuilder,
):
"""
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
files that have not yet been uploaded. This test forces a race between upload and
compaction.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
@@ -792,50 +794,81 @@ def test_compaction_delete_before_upload(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# Now make the flushing hang and update one small piece of data
client.configure_failpoints(("flush-frozen-pausable", "pause"))
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_barrier = threading.Barrier(2)
def checkpoint_in_background():
barrier.wait()
try:
log.info("Checkpoint starting")
client.timeline_checkpoint(tenant_id, timeline_id)
q.put(None)
log.info("Checkpoint complete")
checkpoint_result.put(None)
except PageserverApiException as e:
q.put(e)
log.info("Checkpoint errored: {e}")
checkpoint_result.put(e)
create_thread = threading.Thread(target=checkpoint_in_background)
create_thread.start()
def compact_in_background():
compact_barrier.wait()
try:
log.info("Compaction starting")
client.timeline_compact(tenant_id, timeline_id)
log.info("Compaction complete")
compact_result.put(None)
except PageserverApiException as e:
log.info("Compaction errored: {e}")
compact_result.put(e)
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
checkpoint_thread.start()
compact_thread = threading.Thread(target=compact_in_background)
compact_thread.start()
try:
barrier.wait()
# Start the checkpoint, see that it blocks
log.info("Waiting to see checkpoint hang...")
time.sleep(5)
assert checkpoint_result.empty()
time.sleep(4)
client.timeline_compact(tenant_id, timeline_id)
# Start the compaction, see that it finds work to do but blocks
compact_barrier.wait()
log.info("Waiting to see compaction hang...")
time.sleep(5)
assert compact_result.empty()
client.configure_failpoints(("flush-frozen-pausable", "off"))
# This is logged once compaction is started, but before we wait for operations to complete
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
conflict = q.get()
# Once we unblock uploads the compaction should complete successfully
log.info("Disabling failpoint")
client.configure_failpoints(("before-upload-layer-pausable", "off"))
log.info("Awaiting compaction result")
assert compact_result.get(timeout=10) is None
log.info("Awaiting checkpoint result")
assert checkpoint_result.get(timeout=10) is None
assert conflict is None
except Exception:
# Log the actual failure's backtrace here, before we proceed to join threads
log.exception("Failure, cleaning up...")
raise
finally:
create_thread.join()
compact_barrier.abort()
# Add a delay for the uploads to run into either the file not found or the
time.sleep(4)
checkpoint_thread.join()
compact_thread.join()
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
# For now we are hitting this message.
# Maybe in the future the underlying race condition will be fixed,
# but until then, ensure that this message is hit instead.
assert env.pageserver.log_contains(
# We should not have hit the error handling path in uploads where the remote file is gone
assert not env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)