Compare commits

..

25 Commits

Author SHA1 Message Date
Konstantin Knizhnik
e21b2eb7dc Fix prefetch_set_unused 2024-03-29 14:58:46 +02:00
Konstantin Knizhnik
1cd71d29ac Fix bug in prefetch cleanup 2024-03-29 14:10:25 +02:00
Konstantin Knizhnik
86762ba313 Fix test 2024-03-28 23:09:33 +02:00
Konstantin Knizhnik
6a42b6b87f Add select.sql 2024-03-28 21:54:34 +02:00
Konstantin Knizhnik
48013845f0 Fix test 2024-03-28 20:58:52 +02:00
Konstantin Knizhnik
40226ce7da Check prefetch termination on PS disconnect 2024-03-28 18:26:00 +02:00
Konstantin Knizhnik
ea2083653c Drop connections with all shards invoplved in prefetch in case of error 2024-03-26 16:40:25 +02:00
Christian Schwarz
ad072de420 Revert "pageserver: use a single tokio runtime (#6555)" (#7246) 2024-03-26 15:24:18 +01:00
Anna Khanova
6c18109734 proxy: reuse sess_id as request_id for the cplane requests (#7245)
## Problem

https://github.com/neondatabase/cloud/issues/11599

## Summary of changes

Reuse the same sess_id for requests within the one session.

TODO: get rid of `session_id` in query params.
2024-03-26 11:27:48 +00:00
John Spray
5dee58f492 tests: wait for uploads in test_secondary_downloads (#7220)
## Problem

- https://github.com/neondatabase/neon/issues/6966

This test occasionally failed with some layers unexpectedly not present
on the secondary pageserver. The issue in that failure is the attached
pageserver uploading heatmaps that refer to not-yet-uploaded layers.

## Summary of changes

After uploading heatmap, drain upload queue on attached pageserver, to
guarantee that all the layers referenced in the haetmap are uploaded.
2024-03-26 10:59:16 +00:00
John Spray
6313f1fa7a tests: tolerate transient unavailability in test_sharding_split_failures (#7223)
## Problem

While most forms of split rollback don't interrupt clients, there are a
couple of cases that do -- this interruption is brief, driven by the
time it takes the controller to kick off Reconcilers during the async
abort of the split, so it's operationally fine, but can trip up a test.

- #7148 

## Summary of changes

- Relax test check to require that the tenant is eventually available
after split failure, rather than immediately. In the vast majority of
cases this will pass on the first iteration.
2024-03-26 09:56:47 +00:00
Christian Schwarz
f72415e1fd refactor(remote_timeline_client): infallible stop() and shutdown() (#7234)
preliminary refactoring for
https://github.com/neondatabase/neon/pull/7233

part of #7062
2024-03-25 18:42:18 +01:00
George Ma
d837ce0686 chore: remove repetitive words (#7206)
Signed-off-by: availhang <mayangang@outlook.com>
2024-03-25 11:43:02 -04:00
John Spray
2713142308 tests: stabilize compat tests (#7227)
This test had two flaky failure modes:
- pageserver log error for timeline not found: this resulted from
changes for DR when timeline destroy/create was added, but endpoint was
left running during that operation.
- storage controller log error because the test was running for long
enough that a background reconcile happened at almost the exact moment
of test teardown, and our test fixtures tear down the pageservers before
the controller.

Closes: #7224
2024-03-25 14:35:24 +00:00
Arseny Sher
a6c1fdcaf6 Try to fix test_crafted_wal_end flakiness.
Postgres can always write some more WAL, so previous checks that WAL doesn't
change after something had been crafted were wrong; remove them. Add comments
here and there.

should fix https://github.com/neondatabase/neon/issues/4691
2024-03-25 14:53:06 +03:00
John Spray
adb0526262 pageserver: track total ephemeral layer bytes (#7182)
## Problem

Large quantities of ephemeral layer data can lead to excessive memory
consumption (https://github.com/neondatabase/neon/issues/6939). We
currently don't have a way to know how much ephemeral layer data is
present on a pageserver.

Before we can add new behaviors to proactively roll layers in response
to too much ephemeral data, we must calculate that total.

Related: https://github.com/neondatabase/neon/issues/6916

## Summary of changes

- Create GlobalResources and GlobalResourceUnits types, where timelines
carry a GlobalResourceUnits in their TimelineWriterState.
- Periodically update the size in GlobalResourceUnits:
  - During tick()
  - During layer roll
- During put() if the latest value has drifted more than 10MB since our
last update
- Expose the value of the global ephemeral layer bytes counter as a
prometheus metric.
- Extend the lifetime of TimelineWriterState:
  - Instead of dropping it in TimelineWriter::drop, let it remain.
- Drop TimelineWriterState in roll_layer: this drops our guard on the
global byte count to reflect the fact that we're freezing the layer.
- Ensure the validity of the later in the writer state by clearing the
state in the same place we freeze layers, and asserting on the
write-ability of the layer in `writer()`
- Add a 'context' parameter to `get_open_layer_action` so that it can
skip the prev_lsn==lsn check when called in tick() -- this is needed
because now tick is called with a populated state, where
prev_lsn==Some(lsn) is true for an idle timeline.
- Extend layer rolling test to use this metric
2024-03-25 11:52:50 +00:00
John Spray
0099dfa56b storage controller: tighten up secrets handling (#7105)
- Remove code for using AWS secrets manager, as we're deploying with
k8s->env vars instead
- Load each secret independently, so that one can mix CLI args with
environment variables, rather than requiring that all secrets are loaded
with the same mechanism.
- Add a 'strict mode', enabled by default, which will refuse to start if
secrets are not loaded. This avoids the risk of accidentially disabling
auth by omitting the public key, for example
2024-03-25 11:52:33 +00:00
Vlad Lazar
3a4ebfb95d test: fix test_pageserver_recovery flakyness (#7207)
## Problem
We recently introduced log file validation for the storage controller.
The heartbeater will WARN when it fails
for a node, hence the test fails.

Closes https://github.com/neondatabase/neon/issues/7159

## Summary of changes
* Warn only once for each set of heartbeat retries
* Allow list heartbeat warns
2024-03-25 09:38:12 +00:00
Christian Schwarz
3220f830b7 pageserver: use a single tokio runtime (#6555)
Before this PR, each core had 3 executor threads from 3 different
runtimes. With this PR, we just have one runtime, with one thread per
core. Switching to a single tokio runtime should reduce that effective
over-commit of CPU and in theory help with tail latencies -- iff all
tokio tasks are well-behaved and yield to the runtime regularly.

Are All Tasks Well-Behaved? Are We Ready?
-----------------------------------------

Sadly there doesn't seem to be good out-of-the box tokio tooling to
answer this question.

We *believe* all tasks are well behaved in today's code base, as of the
switch to `virtual_file_io_engine = "tokio-epoll-uring"` in production
(https://github.com/neondatabase/aws/pull/1121).

The only remaining executor-thread-blocking code is walredo and some
filesystem namespace operations.

Filesystem namespace operations work is being tracked in #6663 and not
considered likely to actually block at this time.

Regarding walredo, it currently does a blocking `poll` for read/write to
the pipe file descriptors we use for IPC with the walredo process.
There is an ongoing experiment to make walredo async (#6628), but it
needs more time because there are surprisingly tricky trade-offs that
are articulated in that PR's description (which itself is still WIP).
What's relevant for *this* PR is that
1. walredo is always CPU-bound
2. production tail latencies for walredo request-response
(`pageserver_wal_redo_seconds_bucket`) are
  - p90: with few exceptions, low hundreds of micro-seconds
  - p95: except on very packed pageservers, below 1ms
  - p99: all below 50ms, vast majority below 1ms
  - p99.9: almost all around 50ms, rarely at >= 70ms
- [Dashboard
Link](https://neonprod.grafana.net/d/edgggcrmki3uof/2024-03-walredo-latency?orgId=1&var-ds=ZNX49CDVz&var-pXX_by_instance=0.9&var-pXX_by_instance=0.99&var-pXX_by_instance=0.95&var-adhoc=instance%7C%21%3D%7Cpageserver-30.us-west-2.aws.neon.tech&var-per_instance_pXX_max_seconds=0.0005&from=1711049688777&to=1711136088777)

The ones below 1ms are below our current threshold for when we start
thinking about yielding to the executor.
The tens of milliseconds stalls aren't great, but, not least because of
the implicit overcommit of CPU by the three runtimes, we can't be sure
whether these tens of milliseconds are inherently necessary to do the
walredo work or whether we could be faster if there was less contention
for CPU.

On the first item (walredo being always CPU-bound work): it means that
walredo processes will always compete with the executor threads.
We could yield, using async walredo, but then we hit the trade-offs
explained in that PR.

tl;dr: the risk of stalling executor threads through blocking walredo
seems low, and switching to one runtime cleans up one potential source
for higher-than-necessary stall times (explained in the previous
paragraphs).


Code Changes
------------

- Remove the 3 different runtime definitions.
- Add a new definition called `THE_RUNTIME`.
- Use it in all places that previously used one of the 3 removed
runtimes.
- Remove the argument from `task_mgr`.
- Fix failpoint usage where `pausable_failpoint!` should have been used.
We encountered some actual failures because of this, e.g., hung
`get_metric()` calls during test teardown that would client-timeout
after 300s.

As indicated by the comment above `THE_RUNTIME`, we could take this
clean-up further.
But before we create so much churn, let's first validate that there's no
perf regression.


Performance
-----------

We will test this in staging using the various nightly benchmark runs.

However, the worst-case impact of this change is likely compaction
(=>image layer creation) competing with compute requests.
Image layer creation work can't be easily generated & repeated quickly
by pagebench.
So, we'll simply watch getpage & basebackup tail latencies in staging.

Additionally, I have done manual benchmarking using pagebench.
Report:
https://neondatabase.notion.site/2024-03-23-oneruntime-change-benchmarking-22a399c411e24399a73311115fb703ec?pvs=4
Tail latencies and throughput are marginally better (no regression =
good).
Except in a workload with 128 clients against one tenant.
There, the p99.9 and p99.99 getpage latency is about 2x worse (at
slightly lower throughput).
A dip in throughput every 20s (compaction_period_ is clearly visible,
and probably responsible for that worse tail latency.
This has potential to improve with async walredo, and is an edge case
workload anyway.


Future Work
-----------

1. Once this change has shown satisfying results in production, change
the codebase to use the ambient runtime instead of explicitly
referencing `THE_RUNTIME`.
2. Have a mode where we run with a single-threaded runtime, so we
uncover executor stalls more quickly.
3. Switch or write our own failpoints library that is async-native:
https://github.com/neondatabase/neon/issues/7216
2024-03-23 19:25:11 +01:00
Conrad Ludgate
72103d481d proxy: fix stack overflow in cancel publisher (#7212)
## Problem

stack overflow in blanket impl for `CancellationPublisher`

## Summary of changes

Removes `async_trait` and fixes the impl order to make it non-recursive.
2024-03-23 06:36:58 +00:00
Alex Chi Z
643683f41a fixup(#7204 / postgres): revert IsPrimaryAlive checks (#7209)
Fix #7204.

https://github.com/neondatabase/postgres/pull/400
https://github.com/neondatabase/postgres/pull/401
https://github.com/neondatabase/postgres/pull/402

These commits never go into prod. Detailed investigation will be posted
in another issue. Reverting the commits so that things can keep running
in prod. This pull request adds the test to start two replicas. It fails
on the current main https://github.com/neondatabase/neon/pull/7210 but
passes in this pull request.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-03-23 01:01:51 +00:00
Konstantin Knizhnik
35f4c04c9b Remove Get/SetZenithCurrentClusterSize from Postgres core (#7196)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1711003752072899

## Summary of changes

Move keeping of cluster size to neon extension

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-03-22 13:14:31 -04:00
John Spray
1787cf19e3 pageserver: write consumption metrics to S3 (#7200)
## Problem

The service that receives consumption metrics has lower availability
than S3. Writing metrics to S3 improves their availability.

Closes: https://github.com/neondatabase/cloud/issues/9824

## Summary of changes

- The same data as consumption metrics POST bodies is also compressed
and written to an S3 object with a timestamp-formatted path.
- Set `metric_collection_bucket` (same format as `remote_storage`
config) to configure the location to write to
2024-03-22 14:52:14 +00:00
Alexander Bayandin
2668a1dfab CI: deploy release version to a preprod region (#6811)
## Problem

We want to deploy releases to a preprod region first to perform required
checks

## Summary of changes
- Deploy `release-XXX` / `release-proxy-YYY` docker tags to a preprod region
2024-03-22 14:42:10 +00:00
Conrad Ludgate
77f3a30440 proxy: unit tests for auth_quirks (#7199)
## Problem

I noticed code coverage for auth_quirks was pretty bare

## Summary of changes

Adds 3 happy path unit tests for auth_quirks
* scram
* cleartext (websockets)
* cleartext (password hack)
2024-03-22 13:31:10 +00:00
68 changed files with 1032 additions and 416 deletions

View File

@@ -1121,10 +1121,16 @@ jobs:
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
-f deployPgSniRouter=false \
-f deployProxy=false \
-f deployStorage=true \
-f deployStorageBroker=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
-f deployPgSniRouter=false \
-f deployProxy=false \
@@ -1133,6 +1139,15 @@ jobs:
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \
-f deployStorage=false \
-f deployStorageBroker=false \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
-f deployPgSniRouter=true \
-f deployProxy=true \

25
Cargo.lock generated
View File

@@ -276,7 +276,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"aws-config",
"aws-sdk-secretsmanager",
"bytes",
"camino",
"clap",
@@ -433,29 +432,6 @@ dependencies = [
"url",
]
[[package]]
name = "aws-sdk-secretsmanager"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a0b64e61e7d632d9df90a2e0f32630c68c24960cab1d27d848718180af883d3"
dependencies = [
"aws-credential-types",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
"bytes",
"fastrand 2.0.0",
"http 0.2.9",
"once_cell",
"regex-lite",
"tracing",
]
[[package]]
name = "aws-sdk-sso"
version = "1.12.0"
@@ -4237,6 +4213,7 @@ dependencies = [
"consumption_metrics",
"dashmap",
"env_logger",
"fallible-iterator",
"futures",
"git-version",
"hashbrown 0.13.2",

View File

@@ -52,7 +52,6 @@ async-stream = "0.3"
async-trait = "0.1"
aws-config = { version = "1.1.4", default-features = false, features=["rustls"] }
aws-sdk-s3 = "1.14"
aws-sdk-secretsmanager = { version = "1.14.0" }
aws-sdk-iam = "1.15.0"
aws-smithy-async = { version = "1.1.4", default-features = false, features=["rt-tokio"] }
aws-smithy-types = "1.1.4"
@@ -79,6 +78,7 @@ either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
fallible-iterator = "0.2"
fs2 = "0.4.3"
futures = "0.3"
futures-core = "0.3"

View File

@@ -16,7 +16,6 @@ testing = []
[dependencies]
anyhow.workspace = true
aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
bytes.workspace = true
camino.workspace = true
clap.workspace = true

View File

@@ -139,7 +139,7 @@ impl HeartbeaterTask {
.with_client_retries(
|client| async move { client.get_utilization().await },
&jwt_token,
2,
3,
3,
Duration::from_secs(1),
&cancel,

View File

@@ -3,7 +3,6 @@ use attachment_service::http::make_router;
use attachment_service::metrics::preinitialize_metrics;
use attachment_service::persistence::Persistence;
use attachment_service::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT};
use aws_config::{BehaviorVersion, Region};
use camino::Utf8PathBuf;
use clap::Parser;
use diesel::Connection;
@@ -55,11 +54,31 @@ struct Cli {
#[arg(long)]
database_url: Option<String>,
/// Flag to enable dev mode, which permits running without auth
#[arg(long, default_value = "false")]
dev: bool,
/// Grace period before marking unresponsive pageserver offline
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
}
enum StrictMode {
/// In strict mode, we will require that all secrets are loaded, i.e. security features
/// may not be implicitly turned off by omitting secrets in the environment.
Strict,
/// In dev mode, secrets are optional, and omitting a particular secret will implicitly
/// disable the auth related to it (e.g. no pageserver jwt key -> send unauthenticated
/// requests, no public key -> don't authenticate incoming requests).
Dev,
}
impl Default for StrictMode {
fn default() -> Self {
Self::Strict
}
}
/// Secrets may either be provided on the command line (for testing), or loaded from AWS SecretManager: this
/// type encapsulates the logic to decide which and do the loading.
struct Secrets {
@@ -70,13 +89,6 @@ struct Secrets {
}
impl Secrets {
const DATABASE_URL_SECRET: &'static str = "rds-neon-storage-controller-url";
const PAGESERVER_JWT_TOKEN_SECRET: &'static str =
"neon-storage-controller-pageserver-jwt-token";
const CONTROL_PLANE_JWT_TOKEN_SECRET: &'static str =
"neon-storage-controller-control-plane-jwt-token";
const PUBLIC_KEY_SECRET: &'static str = "neon-storage-controller-public-key";
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
@@ -87,111 +99,41 @@ impl Secrets {
/// - Environment variables if DATABASE_URL is set.
/// - AWS Secrets Manager secrets
async fn load(args: &Cli) -> anyhow::Result<Self> {
match &args.database_url {
Some(url) => Self::load_cli(url, args),
None => match std::env::var(Self::DATABASE_URL_ENV) {
Ok(database_url) => Self::load_env(database_url),
Err(_) => Self::load_aws_sm().await,
},
}
}
fn load_env(database_url: String) -> anyhow::Result<Self> {
let public_key = match std::env::var(Self::PUBLIC_KEY_ENV) {
Ok(public_key) => Some(JwtAuth::from_key(public_key).context("Loading public key")?),
Err(_) => None,
};
Ok(Self {
database_url,
public_key,
jwt_token: std::env::var(Self::PAGESERVER_JWT_TOKEN_ENV).ok(),
control_plane_jwt_token: std::env::var(Self::CONTROL_PLANE_JWT_TOKEN_ENV).ok(),
})
}
async fn load_aws_sm() -> anyhow::Result<Self> {
let Ok(region) = std::env::var("AWS_REGION") else {
anyhow::bail!("AWS_REGION is not set, cannot load secrets automatically: either set this, or use CLI args to supply secrets");
};
let config = aws_config::defaults(BehaviorVersion::v2023_11_09())
.region(Region::new(region.clone()))
.load()
.await;
let asm = aws_sdk_secretsmanager::Client::new(&config);
let Some(database_url) = asm
.get_secret_value()
.secret_id(Self::DATABASE_URL_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string)
let Some(database_url) =
Self::load_secret(&args.database_url, Self::DATABASE_URL_ENV).await
else {
anyhow::bail!(
"Database URL secret not found at {region}/{}",
Self::DATABASE_URL_SECRET
"Database URL is not set (set `--database-url`, or `DATABASE_URL` environment)"
)
};
let jwt_token = asm
.get_secret_value()
.secret_id(Self::PAGESERVER_JWT_TOKEN_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string);
if jwt_token.is_none() {
tracing::warn!("No pageserver JWT token set: this will only work if authentication is disabled on the pageserver");
}
let control_plane_jwt_token = asm
.get_secret_value()
.secret_id(Self::CONTROL_PLANE_JWT_TOKEN_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string);
if jwt_token.is_none() {
tracing::warn!("No control plane JWT token set: this will only work if authentication is disabled on the pageserver");
}
let public_key = asm
.get_secret_value()
.secret_id(Self::PUBLIC_KEY_SECRET)
.send()
.await?
.secret_string()
.map(str::to_string);
let public_key = match public_key {
Some(key) => Some(JwtAuth::from_key(key)?),
None => {
tracing::warn!(
"No public key set: inccoming HTTP requests will not be authenticated"
);
None
}
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV).await {
Some(v) => Some(JwtAuth::from_key(v).context("Loading public key")?),
None => None,
};
Ok(Self {
let this = Self {
database_url,
public_key,
jwt_token,
control_plane_jwt_token,
})
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV).await,
control_plane_jwt_token: Self::load_secret(
&args.control_plane_jwt_token,
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
)
.await,
};
Ok(this)
}
fn load_cli(database_url: &str, args: &Cli) -> anyhow::Result<Self> {
let public_key = match &args.public_key {
None => None,
Some(key) => Some(JwtAuth::from_key(key.clone()).context("Loading public key")?),
};
Ok(Self {
database_url: database_url.to_owned(),
public_key,
jwt_token: args.jwt_token.clone(),
control_plane_jwt_token: args.control_plane_jwt_token.clone(),
})
async fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
if let Some(v) = cli {
Some(v.clone())
} else if let Ok(v) = std::env::var(env_name) {
Some(v)
} else {
None
}
}
}
@@ -247,8 +189,42 @@ async fn async_main() -> anyhow::Result<()> {
args.listen
);
let strict_mode = if args.dev {
StrictMode::Dev
} else {
StrictMode::Strict
};
let secrets = Secrets::load(&args).await?;
// Validate required secrets and arguments are provided in strict mode
match strict_mode {
StrictMode::Strict
if (secrets.public_key.is_none()
|| secrets.jwt_token.is_none()
|| secrets.control_plane_jwt_token.is_none()) =>
{
// Production systems should always have secrets configured: if public_key was not set
// then we would implicitly disable auth.
anyhow::bail!(
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
);
}
StrictMode::Strict if args.compute_hook_url.is_none() => {
// Production systems should always have a compute hook set, to prevent falling
// back to trying to use neon_local.
anyhow::bail!(
"`--compute-hook-url` is not set: this is only permitted in `--dev` mode"
);
}
StrictMode::Strict => {
tracing::info!("Starting in strict mode: configuration is OK.")
}
StrictMode::Dev => {
tracing::warn!("Starting in dev mode: this may be an insecure configuration.")
}
}
let config = Config {
jwt_token: secrets.jwt_token,
control_plane_jwt_token: secrets.control_plane_jwt_token,

View File

@@ -294,7 +294,7 @@ where
// is in state 'taken' but the thread that would unlock it is
// not there.
// 2. A rust object that represented some external resource in the
// parent now got implicitly copied by the the fork, even though
// parent now got implicitly copied by the fork, even though
// the object's type is not `Copy`. The parent program may use
// non-copyability as way to enforce unique ownership of an
// external resource in the typesystem. The fork breaks that

View File

@@ -12,7 +12,7 @@
//!
//! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
//! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
//! the basebackup from the pageserver to initialize the the data directory, and
//! the basebackup from the pageserver to initialize the data directory, and
//! finally launches the PostgreSQL process. It watches the PostgreSQL process
//! until it exits.
//!

View File

@@ -279,6 +279,7 @@ impl StorageController {
&self.listen,
"-p",
self.path.as_ref(),
"--dev",
"--database-url",
&database_url,
"--max-unavailable-interval",

View File

@@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse {
#[derive(Debug)]
pub struct PagestreamGetPageResponse {
pub rel: RelTag,
pub blkno: u32,
pub page: Bytes,
}
@@ -1073,6 +1075,11 @@ impl PagestreamBeMessage {
Self::GetPage(resp) => {
bytes.put_u8(Tag::GetPage as u8);
bytes.put_u32(resp.rel.spcnode);
bytes.put_u32(resp.rel.dbnode);
bytes.put_u32(resp.rel.relnode);
bytes.put_u8(resp.rel.forknum);
bytes.put_u32(resp.blkno);
bytes.put(&resp.page[..]);
}
@@ -1114,9 +1121,20 @@ impl PagestreamBeMessage {
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
}
Tag::GetPage => {
let rel = RelTag {
spcnode: buf.read_u32::<BigEndian>()?,
dbnode: buf.read_u32::<BigEndian>()?,
relnode: buf.read_u32::<BigEndian>()?,
forknum: buf.read_u8()?,
};
let blkno = buf.read_u32::<BigEndian>()?;
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
buf.read_exact(&mut page)?;
PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
rel,
blkno,
page: page.into(),
})
}
Tag::Error => {
let mut msg = Vec::new();

View File

@@ -1,5 +1,6 @@
use anyhow::*;
use clap::{value_parser, Arg, ArgMatches, Command};
use postgres::Client;
use std::{path::PathBuf, str::FromStr};
use wal_craft::*;
@@ -8,8 +9,8 @@ fn main() -> Result<()> {
.init();
let arg_matches = cli().get_matches();
let wal_craft = |arg_matches: &ArgMatches, client| {
let (intermediate_lsns, end_of_wal_lsn) = match arg_matches
let wal_craft = |arg_matches: &ArgMatches, client: &mut Client| {
let intermediate_lsns = match arg_matches
.get_one::<String>("type")
.map(|s| s.as_str())
.context("'type' is required")?
@@ -25,6 +26,7 @@ fn main() -> Result<()> {
LastWalRecordCrossingSegment::NAME => LastWalRecordCrossingSegment::craft(client)?,
a => panic!("Unknown --type argument: {a}"),
};
let end_of_wal_lsn = client.pg_current_wal_insert_lsn()?;
for lsn in intermediate_lsns {
println!("intermediate_lsn = {lsn}");
}

View File

@@ -5,7 +5,6 @@ use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
use std::cmp::Ordering;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
@@ -232,59 +231,52 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow
pub trait Crafter {
const NAME: &'static str;
/// Generates WAL using the client `client`. Returns a pair of:
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
/// May include or exclude Lsn(0) and the end-of-wal.
/// * The expected end-of-wal LSN.
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
/// Generates WAL using the client `client`. Returns a vector of some valid
/// "interesting" intermediate LSNs which one may start reading from.
/// test_end_of_wal uses this to check various starting points.
///
/// Note that postgres is generally keen about writing some WAL. While we
/// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
/// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
/// stable WAL end would be flaky unless postgres is shut down. For this
/// reason returning potential end of WAL here is pointless. Most of the
/// time this doesn't happen though, so it is reasonable to create needed
/// WAL structure and immediately kill postgres like test_end_of_wal does.
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
}
/// Wraps some WAL craft function, providing current LSN to it before the
/// insertion and flushing WAL afterwards. Also pushes initial LSN to the
/// result.
fn craft_internal<C: postgres::GenericClient>(
client: &mut C,
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
) -> anyhow::Result<Vec<PgLsn>> {
ensure_server_config(client)?;
let initial_lsn = client.pg_current_wal_insert_lsn()?;
info!("LSN initial = {}", initial_lsn);
let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
let last_lsn = match last_lsn {
None => client.pg_current_wal_insert_lsn()?,
Some(last_lsn) => {
let insert_lsn = client.pg_current_wal_insert_lsn()?;
match last_lsn.cmp(&insert_lsn) {
Ordering::Less => bail!(
"Some records were inserted after the crafted WAL: {} vs {}",
last_lsn,
insert_lsn
),
Ordering::Equal => last_lsn,
Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
}
}
};
let mut intermediate_lsns = f(client, initial_lsn)?;
if !intermediate_lsns.starts_with(&[initial_lsn]) {
intermediate_lsns.insert(0, initial_lsn);
}
// Some records may be not flushed, e.g. non-transactional logical messages.
//
// Note: this is broken if pg_current_wal_insert_lsn is at page boundary
// because pg_current_wal_insert_lsn skips page headers.
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
Ordering::Equal => {}
Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
}
Ok((intermediate_lsns, last_lsn))
Ok(intermediate_lsns)
}
pub struct Simple;
impl Crafter for Simple {
const NAME: &'static str = "simple";
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
craft_internal(client, |client, _| {
client.execute("CREATE table t(x int)", &[])?;
Ok((Vec::new(), None))
Ok(Vec::new())
})
}
}
@@ -292,29 +284,36 @@ impl Crafter for Simple {
pub struct LastWalRecordXlogSwitch;
impl Crafter for LastWalRecordXlogSwitch {
const NAME: &'static str = "last_wal_record_xlog_switch";
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
// Do not use craft_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
client.execute("CREATE table t(x int)", &[])?;
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
let next_segment = PgLsn::from(0x0200_0000);
// pg_switch_wal returns end of last record of the switched segment,
// i.e. end of SWITCH itself.
let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
let before_xlog_switch_u64 = u64::from(before_xlog_switch);
let next_segment = PgLsn::from(
before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
+ WAL_SEGMENT_SIZE as u64,
);
ensure!(
after_xlog_switch <= next_segment,
"XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
after_xlog_switch,
xlog_switch_record_end <= next_segment,
"XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
xlog_switch_record_end,
next_segment
);
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
Ok(vec![before_xlog_switch, xlog_switch_record_end])
}
}
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
/// Craft xlog SWITCH record ending at page boundary.
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
// Do not use generate_internal because here we end up with flush_lsn exactly on
// the segment boundary and insert_lsn after the initial page header, which is unusual.
ensure_server_config(client)?;
@@ -361,28 +360,29 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
// Emit the XLOG_SWITCH
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
let next_segment = PgLsn::from(0x0200_0000);
ensure!(
after_xlog_switch < next_segment,
"XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
after_xlog_switch,
xlog_switch_record_end < next_segment,
"XLOG_SWITCH record ended on or after the expected segment boundary: {} > {}",
xlog_switch_record_end,
next_segment
);
ensure!(
u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
"XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
after_xlog_switch,
u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
xlog_switch_record_end,
u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
);
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
Ok(vec![before_xlog_switch, xlog_switch_record_end])
}
}
fn craft_single_logical_message(
/// Write ~16MB logical message; it should cross WAL segment.
fn craft_seg_size_logical_message(
client: &mut impl postgres::GenericClient,
transactional: bool,
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
) -> anyhow::Result<Vec<PgLsn>> {
craft_internal(client, |client, initial_lsn| {
ensure!(
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
@@ -405,34 +405,24 @@ fn craft_single_logical_message(
"Logical message crossed two segments"
);
if transactional {
// Transactional logical messages are part of a transaction, so the one above is
// followed by a small COMMIT record.
let after_message_lsn = client.pg_current_wal_insert_lsn()?;
ensure!(
message_lsn < after_message_lsn,
"No record found after the emitted message"
);
Ok((vec![message_lsn], Some(after_message_lsn)))
} else {
Ok((Vec::new(), Some(message_lsn)))
}
Ok(vec![message_lsn])
})
}
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_single_logical_message(client, true)
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
// Transactional message crossing WAL segment will be followed by small
// commit record.
craft_seg_size_logical_message(client, true)
}
}
pub struct LastWalRecordCrossingSegment;
impl Crafter for LastWalRecordCrossingSegment {
const NAME: &'static str = "last_wal_record_crossing_segment";
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
craft_single_logical_message(client, false)
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
craft_seg_size_logical_message(client, false)
}
}

View File

@@ -11,13 +11,15 @@ use utils::const_assert;
use utils::lsn::Lsn;
fn init_logging() {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(
format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"),
))
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(format!(
"crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"
)))
.is_test(true)
.try_init();
}
/// Test that find_end_of_wal returns the same results as pg_dump on various
/// WALs created by Crafter.
fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
use crate::*;
@@ -38,13 +40,13 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
}
cfg.initdb().unwrap();
let srv = cfg.start_server().unwrap();
let (intermediate_lsns, expected_end_of_wal_partial) =
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
let intermediate_lsns = C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
.iter()
.map(|&lsn| u64::from(lsn).into())
.collect();
let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into();
// Kill postgres. Note that it might have inserted to WAL something after
// 'craft' did its job.
srv.kill();
// Check find_end_of_wal on the initial WAL
@@ -56,7 +58,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal);
let expected_end_of_wal = find_pg_waldump_end_of_wal(&cfg, &last_segment);
for start_lsn in intermediate_lsns
.iter()
.chain(std::iter::once(&expected_end_of_wal))
@@ -91,11 +93,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
}
}
fn check_pg_waldump_end_of_wal(
cfg: &crate::Conf,
last_segment: &str,
expected_end_of_wal: Lsn,
) {
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump("000000010000000000000001", last_segment)
@@ -113,11 +111,8 @@ fn check_pg_waldump_end_of_wal(
}
};
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
info!(
"waldump erred on {}, expected wal end at {}",
waldump_wal_end, expected_end_of_wal
);
assert_eq!(waldump_wal_end, expected_end_of_wal);
info!("waldump erred on {}", waldump_wal_end);
waldump_wal_end
}
fn check_end_of_wal(
@@ -210,9 +205,9 @@ pub fn test_update_next_xid() {
#[test]
pub fn test_encode_logical_message() {
let expected = [
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);

View File

@@ -247,7 +247,7 @@ fn scenario_4() {
//
// This is in total 5000 + 1000 + 5000 + 1000 = 12000
//
// (If we used the the method from the previous scenario, and
// (If we used the method from the previous scenario, and
// kept only snapshot at the branch point, we'd need to keep
// all the WAL between 10000-18000 on the main branch, so
// the total size would be 5000 + 1000 + 8000 = 14000. The

View File

@@ -69,7 +69,7 @@ pub struct Config {
/// should be removed once we have a better solution there.
sys_buffer_bytes: u64,
/// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in
/// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
/// threshold.

View File

@@ -615,6 +615,7 @@ fn start_pageserver(
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
&conf.metric_collection_bucket,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,

View File

@@ -234,6 +234,7 @@ pub struct PageServerConf {
// How often to send unchanged cached metrics to the metrics endpoint.
pub cached_metric_collection_interval: Duration,
pub metric_collection_endpoint: Option<Url>,
pub metric_collection_bucket: Option<RemoteStorageConfig>,
pub synthetic_size_calculation_interval: Duration,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
@@ -373,6 +374,7 @@ struct PageServerConfigBuilder {
cached_metric_collection_interval: BuilderValue<Duration>,
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
metric_collection_bucket: BuilderValue<Option<RemoteStorageConfig>>,
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
@@ -455,6 +457,8 @@ impl PageServerConfigBuilder {
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
metric_collection_bucket: Set(None),
disk_usage_based_eviction: Set(None),
test_remote_failures: Set(0),
@@ -586,6 +590,13 @@ impl PageServerConfigBuilder {
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
}
pub fn metric_collection_bucket(
&mut self,
metric_collection_bucket: Option<RemoteStorageConfig>,
) {
self.metric_collection_bucket = BuilderValue::Set(metric_collection_bucket)
}
pub fn synthetic_size_calculation_interval(
&mut self,
synthetic_size_calculation_interval: Duration,
@@ -694,6 +705,7 @@ impl PageServerConfigBuilder {
metric_collection_interval,
cached_metric_collection_interval,
metric_collection_endpoint,
metric_collection_bucket,
synthetic_size_calculation_interval,
disk_usage_based_eviction,
test_remote_failures,
@@ -942,6 +954,9 @@ impl PageServerConf {
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
builder.metric_collection_endpoint(Some(endpoint));
},
"metric_collection_bucket" => {
builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
}
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
@@ -1057,6 +1072,7 @@ impl PageServerConf {
metric_collection_interval: Duration::from_secs(60),
cached_metric_collection_interval: Duration::from_secs(60 * 60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
metric_collection_bucket: None,
synthetic_size_calculation_interval: Duration::from_secs(60),
disk_usage_based_eviction: None,
test_remote_failures: 0,
@@ -1289,6 +1305,7 @@ background_task_maximum_delay = '334 s'
defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
)?,
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
metric_collection_bucket: None,
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
)?,
@@ -1363,6 +1380,7 @@ background_task_maximum_delay = '334 s'
metric_collection_interval: Duration::from_secs(222),
cached_metric_collection_interval: Duration::from_secs(22200),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
metric_collection_bucket: None,
synthetic_size_calculation_interval: Duration::from_secs(333),
disk_usage_based_eviction: None,
test_remote_failures: 0,

View File

@@ -7,6 +7,7 @@ use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tena
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
use reqwest::Url;
use std::collections::HashMap;
use std::sync::Arc;
@@ -41,6 +42,7 @@ type Cache = HashMap<MetricsKey, (EventType, u64)>;
#[allow(clippy::too_many_arguments)]
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_bucket: &Option<RemoteStorageConfig>,
metric_collection_interval: Duration,
_cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
@@ -94,6 +96,20 @@ pub async fn collect_metrics(
.build()
.expect("Failed to create http client with timeout");
let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
match GenericRemoteStorage::from_config(bucket_config) {
Ok(client) => Some(client),
Err(e) => {
// Non-fatal error: if we were given an invalid config, we will proceed
// with sending metrics over the network, but not to S3.
tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
None
}
}
} else {
None
};
let node_id = node_id.to_string();
loop {
@@ -118,10 +134,18 @@ pub async fn collect_metrics(
tracing::error!("failed to persist metrics to {path:?}: {e:#}");
}
}
if let Some(bucket_client) = &bucket_client {
let res =
upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await;
if let Err(e) = res {
tracing::error!("failed to upload to S3: {e:#}");
}
}
};
let upload = async {
let res = upload::upload_metrics(
let res = upload::upload_metrics_http(
&client,
metric_collection_endpoint,
&cancel,
@@ -132,7 +156,7 @@ pub async fn collect_metrics(
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
tracing::error!("failed to upload via HTTP due to {e:#}");
}
};

View File

@@ -1,4 +1,9 @@
use std::time::SystemTime;
use chrono::{DateTime, Utc};
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@@ -13,8 +18,9 @@ struct Ids {
pub(super) timeline_id: Option<TimelineId>,
}
/// Serialize and write metrics to an HTTP endpoint
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
pub(super) async fn upload_metrics(
pub(super) async fn upload_metrics_http(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
@@ -74,6 +80,60 @@ pub(super) async fn upload_metrics(
Ok(())
}
/// Serialize and write metrics to a remote storage object
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
pub(super) async fn upload_metrics_bucket(
client: &GenericRemoteStorage,
cancel: &CancellationToken,
node_id: &str,
metrics: &[RawMetric],
) -> anyhow::Result<()> {
if metrics.is_empty() {
// Skip uploads if we have no metrics, so that readers don't have to handle the edge case
// of an empty object.
return Ok(());
}
// Compose object path
let datetime: DateTime<Utc> = SystemTime::now().into();
let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
// Set up a gzip writer into a buffer
let mut compressed_bytes: Vec<u8> = Vec::new();
let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
// Serialize and write into compressed buffer
let started_at = std::time::Instant::now();
for res in serialize_in_chunks(CHUNK_SIZE, metrics, node_id) {
let (_chunk, body) = res?;
gzip_writer.write_all(&body).await?;
}
gzip_writer.flush().await?;
gzip_writer.shutdown().await?;
let compressed_length = compressed_bytes.len();
// Write to remote storage
client
.upload_storage_object(
futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
compressed_length,
&path,
cancel,
)
.await?;
let elapsed = started_at.elapsed();
tracing::info!(
compressed_length,
elapsed_ms = elapsed.as_millis(),
"write metrics bucket at {path}",
);
Ok(())
}
// The return type is quite ugly, but we gain testability in isolation
fn serialize_in_chunks<'a, F>(
chunk_size: usize,

View File

@@ -435,7 +435,7 @@ pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(||
static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_remote_physical_size",
"The size of the layer files present in the remote storage that are listed in the the remote index_part.json.",
"The size of the layer files present in the remote storage that are listed in the remote index_part.json.",
// Corollary: If any files are missing from the index part, they won't be included here.
&["tenant_id", "shard_id", "timeline_id"]
)
@@ -699,6 +699,14 @@ pub static STARTUP_IS_LOADING: Lazy<UIntGauge> = Lazy::new(|| {
.expect("Failed to register pageserver_startup_is_loading")
});
pub(crate) static TIMELINE_EPHEMERAL_BYTES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_timeline_ephemeral_bytes",
"Total number of bytes in ephemeral layers, summed for all timelines. Approximate, lazily updated."
)
.expect("Failed to register metric")
});
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
/// like how long it took to load.
///

View File

@@ -1156,6 +1156,8 @@ impl PageServerHandler {
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
rel: req.rel,
blkno: req.blkno,
page,
}))
}

View File

@@ -2141,7 +2141,7 @@ impl Tenant {
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tl_client.shutdown().await?;
tl_client.shutdown().await;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this

View File

@@ -217,7 +217,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::download::download_retry;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::Delete;
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
@@ -266,15 +266,6 @@ pub enum MaybeDeletedIndexPart {
Deleted(IndexPart),
}
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
#[derive(Debug, thiserror::Error)]
pub enum StopError {
/// Returned if the upload queue was never initialized.
/// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
#[error("queue is not initialized")]
QueueUninitialized,
}
#[derive(Debug, thiserror::Error)]
pub enum PersistIndexPartWithDeletedFlagError {
#[error("another task is already setting the deleted_flag, started at {0:?}")]
@@ -399,15 +390,10 @@ impl RemoteTimelineClient {
"bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
))?;
{
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
self.update_remote_physical_size_gauge(Some(index_part));
}
// also locks upload queue, without dropping the guard above it will be a deadlock
self.stop().expect("initialized line above");
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
self.update_remote_physical_size_gauge(Some(index_part));
self.stop_impl(&mut upload_queue);
upload_queue
.stopped_mut()
@@ -421,7 +407,8 @@ impl RemoteTimelineClient {
match &mut *self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
UploadQueue::Stopped(q) => q
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
.upload_queue_for_deletion
.get_last_remote_consistent_lsn_projected(),
}
@@ -431,7 +418,8 @@ impl RemoteTimelineClient {
match &mut *self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
UploadQueue::Stopped(q) => Some(
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
q.upload_queue_for_deletion
.get_last_remote_consistent_lsn_visible(),
),
@@ -898,7 +886,7 @@ impl RemoteTimelineClient {
/// Wait for all previously scheduled operations to complete, and then stop.
///
/// Not cancellation safe
pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
pub(crate) async fn shutdown(self: &Arc<Self>) {
// On cancellation the queue is left in ackward state of refusing new operations but
// proper stop is yet to be called. On cancel the original or some later task must call
// `stop` or `shutdown`.
@@ -909,8 +897,12 @@ impl RemoteTimelineClient {
let fut = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return Ok(()),
UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
UploadQueue::Stopped(_) => return,
UploadQueue::Uninitialized => {
// transition into Stopped state
self.stop_impl(&mut guard);
return;
}
UploadQueue::Initialized(ref mut init) => init,
};
@@ -942,7 +934,7 @@ impl RemoteTimelineClient {
}
}
self.stop()
self.stop();
}
/// Set the deleted_at field in the remote index file.
@@ -1324,12 +1316,7 @@ impl RemoteTimelineClient {
// upload finishes or times out soon enough.
if cancel.is_cancelled() {
info!("upload task cancelled by shutdown request");
match self.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
}
}
self.stop();
return;
}
@@ -1584,17 +1571,23 @@ impl RemoteTimelineClient {
/// In-progress operations will still be running after this function returns.
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
/// to wait for them to complete, after calling this function.
pub(crate) fn stop(&self) -> Result<(), StopError> {
pub(crate) fn stop(&self) {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &mut *guard {
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
self.stop_impl(&mut guard);
}
fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
match &mut **guard {
UploadQueue::Uninitialized => {
info!("UploadQueue is in state Uninitialized, nothing to do");
**guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
Ok(())
}
UploadQueue::Initialized(initialized) => {
info!("shutting down upload queue");
@@ -1627,11 +1620,13 @@ impl RemoteTimelineClient {
};
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
upload_queue_for_deletion,
deleted_at: SetDeletedFlagProgress::NotRunning,
}),
&mut **guard,
UploadQueue::Stopped(UploadQueueStopped::Deletable(
UploadQueueStoppedDeletable {
upload_queue_for_deletion,
deleted_at: SetDeletedFlagProgress::NotRunning,
},
)),
);
if let UploadQueue::Initialized(qi) = upload_queue {
qi
@@ -1660,10 +1655,6 @@ impl RemoteTimelineClient {
// which is exactly what we want to happen.
drop(op);
}
// We're done.
drop(guard);
Ok(())
}
}
}

View File

@@ -23,8 +23,12 @@ use tracing::*;
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
use std::cmp::Ordering;
use std::fmt::Write as _;
use std::ops::Range;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{
@@ -70,6 +74,8 @@ pub struct InMemoryLayerInner {
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
resource_units: GlobalResourceUnits,
}
impl std::fmt::Debug for InMemoryLayerInner {
@@ -78,6 +84,101 @@ impl std::fmt::Debug for InMemoryLayerInner {
}
}
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
/// to minimize contention.
///
/// This global state is used to implement behaviors that require a global view of the system, e.g.
/// rolling layers proactively to limit the total amount of dirty data.
struct GlobalResources {
// How many bytes are in all EphemeralFile objects
dirty_bytes: AtomicU64,
// How many layers are contributing to dirty_bytes
dirty_layers: AtomicUsize,
}
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
struct GlobalResourceUnits {
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
// for decrementing the global counter by this many bytes when dropped.
dirty_bytes: u64,
}
impl GlobalResourceUnits {
// Hint for the layer append path to update us when the layer size differs from the last
// call to update_size by this much. If we don't reach this threshold, we'll still get
// updated when the Timeline "ticks" in the background.
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
fn new() -> Self {
GLOBAL_RESOURCES
.dirty_layers
.fetch_add(1, AtomicOrdering::Relaxed);
Self { dirty_bytes: 0 }
}
/// Do not call this frequently: all timelines will write to these same global atomics,
/// so this is a relatively expensive operation. Wait at least a few seconds between calls.
fn publish_size(&mut self, size: u64) {
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => {
return;
}
Ordering::Greater => {
let delta = size - self.dirty_bytes;
let old = GLOBAL_RESOURCES
.dirty_bytes
.fetch_add(delta, AtomicOrdering::Relaxed);
old + delta
}
Ordering::Less => {
let delta = self.dirty_bytes - size;
let old = GLOBAL_RESOURCES
.dirty_bytes
.fetch_sub(delta, AtomicOrdering::Relaxed);
old - delta
}
};
// This is a sloppy update: concurrent updates to the counter will race, and the exact
// value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
// That's okay: as long as the metric contains some recent value, it doesn't have to always
// be literally the last update.
TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
self.dirty_bytes = size;
}
// Call publish_size if the input size differs from last published size by more than
// the drift limit
fn maybe_publish_size(&mut self, size: u64) {
let publish = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => false,
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
};
if publish {
self.publish_size(size);
}
}
}
impl Drop for GlobalResourceUnits {
fn drop(&mut self) {
GLOBAL_RESOURCES
.dirty_layers
.fetch_sub(1, AtomicOrdering::Relaxed);
// Subtract our contribution to the global total dirty bytes
self.publish_size(0);
}
}
static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
dirty_bytes: AtomicU64::new(0),
dirty_layers: AtomicUsize::new(0),
};
impl InMemoryLayer {
pub(crate) fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
@@ -328,6 +429,7 @@ impl InMemoryLayer {
inner: RwLock::new(InMemoryLayerInner {
index: HashMap::new(),
file,
resource_units: GlobalResourceUnits::new(),
}),
})
}
@@ -378,9 +480,18 @@ impl InMemoryLayer {
warn!("Key {} at {} already exists", key, lsn);
}
let size = locked_inner.file.len();
locked_inner.resource_units.maybe_publish_size(size);
Ok(())
}
pub(crate) async fn tick(&self) {
let mut inner = self.inner.write().await;
let size = inner.file.len();
inner.resource_units.publish_size(size);
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -54,6 +54,7 @@ use std::{
ops::ControlFlow,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
@@ -64,7 +65,6 @@ use crate::{
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
@@ -1241,11 +1241,7 @@ impl Timeline {
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
if let Err(e) = client.shutdown().await {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to flush to remote storage: {e:#}");
}
client.shutdown().await;
}
}
Err(e) => {
@@ -1282,12 +1278,7 @@ impl Timeline {
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
// case our caller wants to use that for a deletion
if let Some(remote_client) = self.remote_client.as_ref() {
match remote_client.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
// Shutting down during initialization is legal
}
}
remote_client.stop();
}
tracing::debug!("Waiting for tasks...");
@@ -2855,7 +2846,15 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
return None;
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
async fn get_ready_ancestor_timeline(
@@ -4461,6 +4460,9 @@ impl<'a> TimelineWriter<'a> {
let action = self.get_open_layer_action(last_record_lsn, 0);
if action == OpenLayerAction::Roll {
self.roll_layer(last_record_lsn).await?;
} else if let Some(writer_state) = &mut *self.write_guard {
// Periodic update of statistics
writer_state.open_layer.tick().await;
}
Ok(())

View File

@@ -16,9 +16,7 @@ use crate::{
tenant::{
debug_assert_current_span_has_tenant_and_timeline_id,
metadata::TimelineMetadata,
remote_timeline_client::{
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
},
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
CreateTimelineCause, DeleteTimelineError, Tenant,
},
};
@@ -50,19 +48,7 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
}
},
}
remote_client.stop();
}
// Stop & wait for the remaining timeline tasks, including upload tasks.

View File

@@ -121,11 +121,16 @@ pub(super) enum SetDeletedFlagProgress {
Successful(NaiveDateTime),
}
pub(super) struct UploadQueueStopped {
pub(super) struct UploadQueueStoppedDeletable {
pub(super) upload_queue_for_deletion: UploadQueueInitialized,
pub(super) deleted_at: SetDeletedFlagProgress,
}
pub(super) enum UploadQueueStopped {
Deletable(UploadQueueStoppedDeletable),
Uninitialized,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotInitialized {
#[error("queue is in state Uninitialized")]
@@ -249,12 +254,15 @@ impl UploadQueue {
}
}
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
match self {
UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Stopped(stopped) => Ok(stopped),
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
anyhow::bail!("queue is in state Stopped(Uninitialized)")
}
UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
}
}
}

View File

@@ -782,7 +782,7 @@ where
}
}
// NB: don't use `buf.is_empty()` here; it is from the
// `impl Deref for Slice { Target = [u8] }`; the the &[u8]
// `impl Deref for Slice { Target = [u8] }`; the &[u8]
// returned by it only covers the initialized portion of `buf`.
// Whereas we're interested in ensuring that we filled the entire
// buffer that the user passed in.

View File

@@ -111,6 +111,7 @@ static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void pageserver_disconnect_shard(shardno_t shard_no);
static bool
PagestoreShmemIsValid(void)
@@ -487,9 +488,31 @@ retry:
return ret;
}
/*
* Reset prefetch and drop connection to the shard.
* It also drops connection to all other shards involved in prefetch.
*/
static void
pageserver_disconnect(shardno_t shard_no)
{
if (page_servers[shard_no].conn)
{
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
* cause big problems, because connection loss is supposed to be a
* rare event.
*/
prefetch_on_ps_disconnect();
}
pageserver_disconnect_shard(shard_no);
}
/*
* Disconnect from specified shard
*/
static void
pageserver_disconnect_shard(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -503,14 +526,6 @@ pageserver_disconnect(shardno_t shard_no)
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
* cause big problems, because connection loss is supposed to be a
* rare event.
*/
prefetch_on_ps_disconnect();
}
if (page_servers[shard_no].wes != NULL)
{
@@ -676,7 +691,8 @@ page_server_api api =
{
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
.receive = pageserver_receive,
.disconnect = pageserver_disconnect_shard
};
static bool

View File

@@ -312,7 +312,7 @@ pg_cluster_size(PG_FUNCTION_ARGS)
{
int64 size;
size = GetZenithCurrentClusterSize();
size = GetNeonCurrentClusterSize();
if (size == 0)
PG_RETURN_NULL();

View File

@@ -26,6 +26,8 @@ extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
extern uint64 BackpressureThrottlingTime(void);
extern void SetNeonCurrentClusterSize(uint64 size);
extern uint64 GetNeonCurrentClusterSize(void);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);

View File

@@ -139,6 +139,9 @@ typedef struct
typedef struct
{
NeonMessageTag tag;
NRelFileInfo rinfo;
ForkNumber forknum;
BlockNumber blkno;
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
@@ -180,6 +183,7 @@ typedef struct
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
void (*disconnect) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);

View File

@@ -613,6 +613,14 @@ prefetch_on_ps_disconnect(void)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == ring_index);
/*
* Drop connection to all shards which have prefetch requests.
* It is not a problem to call disconnect multiple times on the same connection
* because disconnect implementation in libpagestore.c will check if connection
* is alive and do nothing of connection was already dropped.
*/
page_server->disconnect(slot->shard_no);
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight -= 1;
@@ -633,13 +641,12 @@ prefetch_on_ps_disconnect(void)
static inline void
prefetch_set_unused(uint64 ring_index)
{
PrefetchRequest *slot = GetPrfSlot(ring_index);
PrefetchRequest *slot;
if (ring_index < MyPState->ring_last)
return; /* Should already be unused */
Assert(MyPState->ring_unused > ring_index);
slot = GetPrfSlot(ring_index);
if (slot->status == PRFS_UNUSED)
return;
@@ -798,7 +805,8 @@ Retry:
{
if (*force_lsn > slot->effective_request_lsn)
{
prefetch_wait_for(ring_index);
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
@@ -813,7 +821,8 @@ Retry:
{
if (*force_lsn != slot->effective_request_lsn)
{
prefetch_wait_for(ring_index);
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(ring_index);
entry = NULL;
}
@@ -879,7 +888,8 @@ Retry:
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == cleanup_index);
prefetch_wait_for(cleanup_index);
if (!prefetch_wait_for(cleanup_index))
goto Retry;
prefetch_set_unused(cleanup_index);
break;
case PRFS_RECEIVED:
@@ -1108,6 +1118,11 @@ nm_unpack_response(StringInfo s)
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
msg_resp->tag = tag;
NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4);
msg_resp->forknum = pq_getmsgbyte(s);
msg_resp->blkno = pq_getmsgint(s, 4);
/* XXX: should be varlena */
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
pq_getmsgend(s);
@@ -1831,7 +1846,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
reln->smgr_relpersistence == RELPERSISTENCE_PERMANENT &&
!IsAutoVacuumWorkerProcess())
{
uint64 current_size = GetZenithCurrentClusterSize();
uint64 current_size = GetNeonCurrentClusterSize();
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
@@ -1912,7 +1927,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
reln->smgr_relpersistence == RELPERSISTENCE_PERMANENT &&
!IsAutoVacuumWorkerProcess())
{
uint64 current_size = GetZenithCurrentClusterSize();
uint64 current_size = GetNeonCurrentClusterSize();
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
@@ -2132,6 +2147,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/*
* Try to find prefetched page in the list of received pages.
*/
Retry:
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
if (entry != NULL)
@@ -2153,7 +2169,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
*/
if (slot->status == PRFS_REQUESTED)
{
prefetch_wait_for(slot->my_ring_index);
if (!prefetch_wait_for(slot->my_ring_index))
goto Retry;
}
/* drop caches */
prefetch_set_unused(slot->my_ring_index);
@@ -2200,10 +2217,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
switch (resp->tag)
{
case T_NeonGetPageResponse:
memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ);
{
NeonGetPageResponse* r = (NeonGetPageResponse *) resp;
memcpy(buffer, r->page, BLCKSZ);
if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
slot->shard_no,
r->blkno, RelFileInfoFmt(r->rinfo), r->forknum,
blkno, RelFileInfoFmt(rinfo), forkNum,
(uint32) (request_lsn >> 32), (uint32) request_lsn)));
lfc_write(rinfo, forkNum, blkno, buffer);
break;
}
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),

View File

@@ -287,6 +287,7 @@ typedef struct WalproposerShmemState
slock_t mutex;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
pg_atomic_uint64 currentClusterSize;
/* last feedback from each shard */
PageserverFeedback shard_ps_feedback[MAX_SHARDS];

View File

@@ -282,6 +282,7 @@ WalproposerShmemInit(void)
memset(walprop_shared, 0, WalproposerShmemSize());
SpinLockInit(&walprop_shared->mutex);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
}
LWLockRelease(AddinShmemInitLock);
@@ -1972,7 +1973,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
/* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetZenithCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
{
@@ -2094,6 +2095,18 @@ GetLogRepRestartLSN(WalProposer *wp)
return lrRestartLsn;
}
void SetNeonCurrentClusterSize(uint64 size)
{
pg_atomic_write_u64(&walprop_shared->currentClusterSize, size);
}
uint64 GetNeonCurrentClusterSize(void)
{
return pg_atomic_read_u64(&walprop_shared->currentClusterSize);
}
uint64 GetNeonCurrentClusterSize(void);
static const walproposer_api walprop_pg = {
.get_shmem_state = walprop_pg_get_shmem_state,
.start_streaming = walprop_pg_start_streaming,

View File

@@ -97,6 +97,7 @@ workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
fallible-iterator.workspace = true
rcgen.workspace = true
rstest.workspace = true
tokio-postgres-rustls.workspace = true

View File

@@ -408,3 +408,228 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use postgres_protocol::{
authentication::sasl::{ChannelBinding, ScramSha256},
message::{backend::Message as PgMessage, frontend},
};
use provider::AuthSecret;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use crate::{
auth::{ComputeUserInfoMaybeEndpoint, IpPattern},
config::AuthenticationConfig,
console::{
self,
provider::{self, CachedAllowedIps, CachedRoleSecret},
CachedNodeInfo,
},
context::RequestMonitoring,
proxy::NeonOptions,
scram::ServerSecret,
stream::{PqStream, Stream},
};
use super::auth_quirks;
struct Auth {
ips: Vec<IpPattern>,
secret: AuthSecret,
}
impl console::Api for Auth {
async fn get_role_secret(
&self,
_ctx: &mut RequestMonitoring,
_user_info: &super::ComputeUserInfo,
) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError> {
Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone())))
}
async fn get_allowed_ips_and_secret(
&self,
_ctx: &mut RequestMonitoring,
_user_info: &super::ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>
{
Ok((
CachedAllowedIps::new_uncached(Arc::new(self.ips.clone())),
Some(CachedRoleSecret::new_uncached(Some(self.secret.clone()))),
))
}
async fn wake_compute(
&self,
_ctx: &mut RequestMonitoring,
_user_info: &super::ComputeUserInfo,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
unimplemented!()
}
}
static CONFIG: &AuthenticationConfig = &AuthenticationConfig {
scram_protocol_timeout: std::time::Duration::from_secs(5),
};
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
loop {
r.read_buf(&mut *b).await.unwrap();
if let Some(m) = PgMessage::parse(&mut *b).unwrap() {
break m;
}
}
}
#[tokio::test]
async fn auth_quirks_scram() {
let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server));
let mut ctx = RequestMonitoring::test();
let api = Auth {
ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
};
let user_info = ComputeUserInfoMaybeEndpoint {
user: "conrad".into(),
endpoint_id: Some("endpoint".into()),
options: NeonOptions::default(),
};
let handle = tokio::spawn(async move {
let mut scram = ScramSha256::new(b"my-secret-password", ChannelBinding::unsupported());
let mut read = BytesMut::new();
// server should offer scram
match read_message(&mut client, &mut read).await {
PgMessage::AuthenticationSasl(a) => {
let options: Vec<&str> = a.mechanisms().collect().unwrap();
assert_eq!(options, ["SCRAM-SHA-256"]);
}
_ => panic!("wrong message"),
}
// client sends client-first-message
let mut write = BytesMut::new();
frontend::sasl_initial_response("SCRAM-SHA-256", scram.message(), &mut write).unwrap();
client.write_all(&write).await.unwrap();
// server response with server-first-message
match read_message(&mut client, &mut read).await {
PgMessage::AuthenticationSaslContinue(a) => {
scram.update(a.data()).await.unwrap();
}
_ => panic!("wrong message"),
}
// client response with client-final-message
write.clear();
frontend::sasl_response(scram.message(), &mut write).unwrap();
client.write_all(&write).await.unwrap();
// server response with server-final-message
match read_message(&mut client, &mut read).await {
PgMessage::AuthenticationSaslFinal(a) => {
scram.finish(a.data()).unwrap();
}
_ => panic!("wrong message"),
}
});
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, false, CONFIG)
.await
.unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn auth_quirks_cleartext() {
let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server));
let mut ctx = RequestMonitoring::test();
let api = Auth {
ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
};
let user_info = ComputeUserInfoMaybeEndpoint {
user: "conrad".into(),
endpoint_id: Some("endpoint".into()),
options: NeonOptions::default(),
};
let handle = tokio::spawn(async move {
let mut read = BytesMut::new();
let mut write = BytesMut::new();
// server should offer cleartext
match read_message(&mut client, &mut read).await {
PgMessage::AuthenticationCleartextPassword => {}
_ => panic!("wrong message"),
}
// client responds with password
write.clear();
frontend::password_message(b"my-secret-password", &mut write).unwrap();
client.write_all(&write).await.unwrap();
});
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, CONFIG)
.await
.unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn auth_quirks_password_hack() {
let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server));
let mut ctx = RequestMonitoring::test();
let api = Auth {
ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
};
let user_info = ComputeUserInfoMaybeEndpoint {
user: "conrad".into(),
endpoint_id: None,
options: NeonOptions::default(),
};
let handle = tokio::spawn(async move {
let mut read = BytesMut::new();
// server should offer cleartext
match read_message(&mut client, &mut read).await {
PgMessage::AuthenticationCleartextPassword => {}
_ => panic!("wrong message"),
}
// client responds with password
let mut write = BytesMut::new();
frontend::password_message(b"endpoint=my-endpoint;my-secret-password", &mut write)
.unwrap();
client.write_all(&write).await.unwrap();
});
let creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, CONFIG)
.await
.unwrap();
assert_eq!(creds.info.endpoint, "my-endpoint");
handle.await.unwrap();
}
}

View File

@@ -211,4 +211,19 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn cancel_session_noop_regression() {
let handler = CancellationHandler::<()>::new(Default::default(), "local");
handler
.cancel_session(
CancelKeyData {
backend_pid: 0,
cancel_key: 0,
},
Uuid::new_v4(),
)
.await
.unwrap();
}
}

View File

@@ -82,14 +82,13 @@ pub type ScramKeys = tokio_postgres::config::ScramKeys<32>;
/// A config for establishing a connection to compute node.
/// Eventually, `tokio_postgres` will be replaced with something better.
/// Newtype allows us to implement methods on top of it.
#[derive(Clone)]
#[repr(transparent)]
#[derive(Clone, Default)]
pub struct ConnCfg(Box<tokio_postgres::Config>);
/// Creation and initialization routines.
impl ConnCfg {
pub fn new() -> Self {
Self(Default::default())
Self::default()
}
/// Reuse password or auth keys from the other config.
@@ -165,12 +164,6 @@ impl std::ops::DerefMut for ConnCfg {
}
}
impl Default for ConnCfg {
fn default() -> Self {
Self::new()
}
}
impl ConnCfg {
/// Establish a raw TCP connection to the compute node.
async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> {

View File

@@ -6,7 +6,7 @@ pub mod messages;
/// Wrappers for console APIs and their mocks.
pub mod provider;
pub use provider::{errors, Api, AuthSecret, CachedNodeInfo, NodeInfo};
pub(crate) use provider::{errors, Api, AuthSecret, CachedNodeInfo, NodeInfo};
/// Various cache-related types.
pub mod caches {

View File

@@ -14,7 +14,6 @@ use crate::{
context::RequestMonitoring,
scram, EndpointCacheKey, ProjectId,
};
use async_trait::async_trait;
use dashmap::DashMap;
use std::{sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
@@ -326,8 +325,7 @@ pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPatt
/// This will allocate per each call, but the http requests alone
/// already require a few allocations, so it should be fine.
#[async_trait]
pub trait Api {
pub(crate) trait Api {
/// Get the client's auth secret for authentication.
/// Returns option because user not found situation is special.
/// We still have to mock the scram to avoid leaking information that user doesn't exist.
@@ -363,7 +361,6 @@ pub enum ConsoleBackend {
Test(Box<dyn crate::auth::backend::TestBackend>),
}
#[async_trait]
impl Api for ConsoleBackend {
async fn get_role_secret(
&self,

View File

@@ -8,7 +8,6 @@ use crate::console::provider::{CachedAllowedIps, CachedRoleSecret};
use crate::context::RequestMonitoring;
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use crate::{auth::IpPattern, cache::Cached};
use async_trait::async_trait;
use futures::TryFutureExt;
use std::{str::FromStr, sync::Arc};
use thiserror::Error;
@@ -144,7 +143,6 @@ async fn get_execute_postgres_query(
Ok(Some(entry))
}
#[async_trait]
impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_role_secret(

View File

@@ -14,7 +14,6 @@ use crate::{
context::RequestMonitoring,
metrics::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER},
};
use async_trait::async_trait;
use futures::TryFutureExt;
use std::sync::Arc;
use tokio::time::Instant;
@@ -56,7 +55,7 @@ impl Api {
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = ctx.session_id.to_string();
let application_name = ctx.console_application_name();
async {
let request = self
@@ -113,7 +112,7 @@ impl Api {
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = ctx.session_id.to_string();
let application_name = ctx.console_application_name();
async {
let mut request_builder = self
@@ -168,7 +167,6 @@ impl Api {
}
}
#[async_trait]
impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_role_secret(

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use async_trait::async_trait;
use pq_proto::CancelKeyData;
use redis::AsyncCommands;
use tokio::sync::Mutex;
@@ -13,8 +12,8 @@ use super::{
notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME},
};
#[async_trait]
pub trait CancellationPublisherMut: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(
&mut self,
cancel_key_data: CancelKeyData,
@@ -22,8 +21,8 @@ pub trait CancellationPublisherMut: Send + Sync + 'static {
) -> anyhow::Result<()>;
}
#[async_trait]
pub trait CancellationPublisher: Send + Sync + 'static {
#[allow(async_fn_in_trait)]
async fn try_publish(
&self,
cancel_key_data: CancelKeyData,
@@ -31,10 +30,9 @@ pub trait CancellationPublisher: Send + Sync + 'static {
) -> anyhow::Result<()>;
}
#[async_trait]
impl CancellationPublisherMut for () {
impl CancellationPublisher for () {
async fn try_publish(
&mut self,
&self,
_cancel_key_data: CancelKeyData,
_session_id: Uuid,
) -> anyhow::Result<()> {
@@ -42,18 +40,16 @@ impl CancellationPublisherMut for () {
}
}
#[async_trait]
impl<P: CancellationPublisherMut> CancellationPublisher for P {
impl<P: CancellationPublisher> CancellationPublisherMut for P {
async fn try_publish(
&self,
_cancel_key_data: CancelKeyData,
_session_id: Uuid,
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
) -> anyhow::Result<()> {
self.try_publish(_cancel_key_data, _session_id).await
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id).await
}
}
#[async_trait]
impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
async fn try_publish(
&self,
@@ -68,7 +64,6 @@ impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
}
}
#[async_trait]
impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
async fn try_publish(
&self,
@@ -145,7 +140,6 @@ impl RedisPublisherClient {
}
}
#[async_trait]
impl CancellationPublisherMut for RedisPublisherClient {
async fn try_publish(
&mut self,

View File

@@ -3,9 +3,7 @@
use std::convert::Infallible;
use hmac::{Hmac, Mac};
use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
use subtle::{Choice, ConstantTimeEq};
use sha2::Sha256;
use tokio::task::yield_now;
use super::messages::{
@@ -13,6 +11,7 @@ use super::messages::{
};
use super::secret::ServerSecret;
use super::signature::SignatureBuilder;
use super::ScramKey;
use crate::config;
use crate::sasl::{self, ChannelBinding, Error as SaslError};
@@ -104,7 +103,7 @@ async fn pbkdf2(str: &[u8], salt: &[u8], iterations: u32) -> [u8; 32] {
}
// copied from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L236-L248>
async fn derive_keys(password: &[u8], salt: &[u8], iterations: u32) -> ([u8; 32], [u8; 32]) {
async fn derive_client_key(password: &[u8], salt: &[u8], iterations: u32) -> ScramKey {
let salted_password = pbkdf2(password, salt, iterations).await;
let make_key = |name| {
@@ -116,7 +115,7 @@ async fn derive_keys(password: &[u8], salt: &[u8], iterations: u32) -> ([u8; 32]
<[u8; 32]>::from(key.into_bytes())
};
(make_key(b"Client Key"), make_key(b"Server Key"))
make_key(b"Client Key").into()
}
pub async fn exchange(
@@ -124,21 +123,12 @@ pub async fn exchange(
password: &[u8],
) -> sasl::Result<sasl::Outcome<super::ScramKey>> {
let salt = base64::decode(&secret.salt_base64)?;
let (client_key, server_key) = derive_keys(password, &salt, secret.iterations).await;
let stored_key: [u8; 32] = Sha256::default()
.chain_update(client_key)
.finalize_fixed()
.into();
let client_key = derive_client_key(password, &salt, secret.iterations).await;
// constant time to not leak partial key match
let valid = stored_key.ct_eq(&secret.stored_key.as_bytes())
| server_key.ct_eq(&secret.server_key.as_bytes())
| Choice::from(secret.doomed as u8);
if valid.into() {
Ok(sasl::Outcome::Success(super::ScramKey::from(client_key)))
} else {
if secret.is_password_invalid(&client_key).into() {
Ok(sasl::Outcome::Failure("password doesn't match"))
} else {
Ok(sasl::Outcome::Success(client_key))
}
}
@@ -220,7 +210,7 @@ impl SaslSentInner {
.derive_client_key(&client_final_message.proof);
// Auth fails either if keys don't match or it's pre-determined to fail.
if client_key.sha256() != secret.stored_key || secret.doomed {
if secret.is_password_invalid(&client_key).into() {
return Ok(sasl::Step::Failure("password doesn't match"));
}

View File

@@ -1,17 +1,31 @@
//! Tools for client/server/stored key management.
use subtle::ConstantTimeEq;
/// Faithfully taken from PostgreSQL.
pub const SCRAM_KEY_LEN: usize = 32;
/// One of the keys derived from the user's password.
/// We use the same structure for all keys, i.e.
/// `ClientKey`, `StoredKey`, and `ServerKey`.
#[derive(Clone, Default, PartialEq, Eq, Debug)]
#[derive(Clone, Default, Eq, Debug)]
#[repr(transparent)]
pub struct ScramKey {
bytes: [u8; SCRAM_KEY_LEN],
}
impl PartialEq for ScramKey {
fn eq(&self, other: &Self) -> bool {
self.ct_eq(other).into()
}
}
impl ConstantTimeEq for ScramKey {
fn ct_eq(&self, other: &Self) -> subtle::Choice {
self.bytes.ct_eq(&other.bytes)
}
}
impl ScramKey {
pub fn sha256(&self) -> Self {
super::sha256([self.as_ref()]).into()

View File

@@ -206,6 +206,28 @@ mod tests {
}
}
#[test]
fn parse_client_first_message_with_invalid_gs2_authz() {
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
}
#[test]
fn parse_client_first_message_with_extra_params() {
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
assert_eq!(msg.username, "user");
assert_eq!(msg.nonce, "nonce");
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
}
#[test]
fn parse_client_first_message_with_extra_params_invalid() {
// must be of the form `<ascii letter>=<...>`
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
}
#[test]
fn parse_client_final_message() {
let input = [

View File

@@ -1,5 +1,7 @@
//! Tools for SCRAM server secret management.
use subtle::{Choice, ConstantTimeEq};
use super::base64_decode_array;
use super::key::ScramKey;
@@ -40,6 +42,11 @@ impl ServerSecret {
Some(secret)
}
pub fn is_password_invalid(&self, client_key: &ScramKey) -> Choice {
// constant time to not leak partial key match
client_key.sha256().ct_ne(&self.stored_key) | Choice::from(self.doomed as u8)
}
/// To avoid revealing information to an attacker, we use a
/// mocked server secret even if the user doesn't exist.
/// See `auth-scram.c : mock_scram_secret` for details.

View File

@@ -244,6 +244,7 @@ impl SimulationApi {
mutex: 0,
mineLastElectedTerm: 0,
backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
currentClusterSize: pg_atomic_uint64 { value: 0 },
shard_ps_feedback: [empty_feedback; 128],
num_shards: 0,
min_ps_feedback: empty_feedback,

View File

@@ -1155,13 +1155,17 @@ class NeonEnv:
After this method returns, there should be no child processes running.
"""
self.endpoints.stop_all()
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
self.storage_controller.stop(immediate=immediate)
for sk in self.safekeepers:
sk.stop(immediate=immediate)
for pageserver in self.pageservers:
if ps_assert_metric_no_errors:
pageserver.assert_no_metric_errors()
pageserver.stop(immediate=immediate)
self.storage_controller.stop(immediate=immediate)
self.broker.stop(immediate=immediate)
@property

View File

@@ -96,6 +96,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*Call to node.*management API.*failed.*ReceiveBody.*",
# Many tests will start up with a node offline
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode
".*Starting in dev mode.*",
]

View File

@@ -62,9 +62,7 @@ def wait_for_upload(
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn, current_lsn
)
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
)

View File

@@ -0,0 +1,2 @@
select sum(abalance) from pgbench_accounts;

View File

@@ -105,7 +105,7 @@ def test_pageserver_multiple_keys(neon_env_builder: NeonEnvBuilder):
# The neon_local tool generates one key pair at a hardcoded path by default.
# As a preparation for our test, move the public key of the key pair into a
# directory at the same location as the hardcoded path by:
# 1. moving the the file at `configured_pub_key_path` to a temporary location
# 1. moving the file at `configured_pub_key_path` to a temporary location
# 2. creating a new directory at `configured_pub_key_path`
# 3. moving the file from the temporary location into the newly created directory
configured_pub_key_path = Path(env.repo_dir) / "auth_public_key.pem"

View File

@@ -267,9 +267,10 @@ def test_forward_compatibility(
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
ep = env.endpoints.create_start("main")
connstr = ep.connstr()
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
connstr = ep.connstr()
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"]
)
@@ -286,6 +287,9 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
timeline_id = env.initial_timeline
pg_version = env.pg_version
# Stop endpoint while we recreate timeline
ep.stop()
try:
pageserver_http.timeline_preserve_initdb_archive(tenant_id, timeline_id)
except PageserverApiException as e:
@@ -310,6 +314,9 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
existing_initdb_timeline_id=timeline_id,
)
# Timeline exists again: restart the endpoint
ep.start()
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
)

View File

@@ -84,3 +84,21 @@ def test_hot_standby(neon_simple_env: NeonEnv):
# clean up
if slow_down_send:
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
def test_2_replicas_start(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary1"
) as secondary1:
with env.endpoints.new_replica_start(
origin=primary, endpoint_id="secondary2"
) as secondary2:
wait_replica_caughtup(primary, secondary1)
wait_replica_caughtup(primary, secondary2)

View File

@@ -1,4 +1,6 @@
import gzip
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
@@ -10,7 +12,11 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
remote_storage_to_toml_inline_table,
)
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
@@ -40,6 +46,9 @@ def test_metric_collection(
uploads.put((events, is_last == "true"))
return Response(status=200)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
assert neon_env_builder.pageserver_remote_storage is not None
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
@@ -48,12 +57,11 @@ def test_metric_collection(
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
metric_collection_bucket={remote_storage_to_toml_inline_table(neon_env_builder.pageserver_remote_storage)}
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
@@ -167,6 +175,20 @@ def test_metric_collection(
httpserver.check()
# Check that at least one bucket output object is present, and that all
# can be decompressed and decoded.
bucket_dumps = {}
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
for dirpath, _dirs, files in os.walk(env.pageserver_remote_storage.root):
for file in files:
file_path = os.path.join(dirpath, file)
log.info(file_path)
if file.endswith(".gz"):
bucket_dumps[file_path] = json.load(gzip.open(file_path))
assert len(bucket_dumps) >= 1
assert all("events" in data for data in bucket_dumps.values())
def test_metric_collection_cleans_up_tempfile(
httpserver: HTTPServer,

View File

@@ -13,14 +13,17 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_pageserver_restarts")
endpoint = env.endpoints.create_start("test_pageserver_restarts")
n_restarts = 10
endpoint = env.endpoints.create_start(
"test_pageserver_restarts", config_lines=["effective_io_concurrency=100"]
)
n_restarts = 100
scale = 10
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
pg_bin.run_capture(["pgbench", "-c", "10", "-f", "test_runner/regress/select.sql", f"-T{n_restarts}", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()

View File

@@ -11,6 +11,7 @@ from fixtures.pageserver.utils import (
assert_prefix_empty,
poll_for_remote_storage_iterations,
tenant_delete_wait_completed,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage
from fixtures.types import TenantId, TimelineId
@@ -472,6 +473,10 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
log.info("Synchronizing after initial write...")
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
# Ensure that everything which appears in the heatmap is also present in S3: heatmap writers
# are allowed to upload heatmaps that reference layers which are only enqueued for upload
wait_for_upload_queue_empty(ps_attached.http_client(), tenant_id, timeline_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
@@ -484,6 +489,11 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
workload.churn_rows(128, ps_attached.id)
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
# Ensure that everything which appears in the heatmap is also present in S3: heatmap writers
# are allowed to upload heatmaps that reference layers which are only enqueued for upload
wait_for_upload_queue_empty(ps_attached.http_client(), tenant_id, timeline_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(

View File

@@ -1,5 +1,4 @@
import asyncio
import time
from typing import Tuple
import pytest
@@ -10,7 +9,7 @@ from fixtures.neon_fixtures import (
tenant_get_shards,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
@@ -61,6 +60,15 @@ def wait_until_pageserver_is_caught_up(
assert waited >= last_flush_lsn
def wait_until_pageserver_has_uploaded(
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
):
for tenant, timeline, last_flush_lsn in last_flush_lsns:
shards = tenant_get_shards(env, tenant)
for tenant_shard_id, pageserver in shards:
wait_for_upload(pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn)
def wait_for_wal_ingest_metric(pageserver_http: PageserverHttpClient) -> float:
def query():
value = pageserver_http.get_metric_value("pageserver_wal_ingest_records_received_total")
@@ -86,25 +94,50 @@ def test_pageserver_small_inmemory_layers(
The workload creates a number of timelines and writes some data to each,
but not enough to trigger flushes via the `checkpoint_distance` config.
"""
def get_dirty_bytes():
v = (
env.pageserver.http_client().get_metric_value("pageserver_timeline_ephemeral_bytes")
or 0
)
log.info(f"dirty_bytes: {v}")
return v
def assert_dirty_bytes(v):
assert get_dirty_bytes() == v
env = neon_env_builder.init_configs()
env.start()
last_flush_lsns = asyncio.run(workload(env, TIMELINE_COUNT, ENTRIES_PER_TIMELINE))
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# We didn't write enough data to trigger a size-based checkpoint
assert get_dirty_bytes() > 0
ps_http_client = env.pageserver.http_client()
total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client)
log.info("Sleeping for checkpoint timeout ...")
time.sleep(CHECKPOINT_TIMEOUT_SECONDS + 5)
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
# such that there are zero bytes of ephemeral layer left on the pageserver
log.info("Waiting for background checkpoints...")
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(0)) # type: ignore
# Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they
# must be uploaded to remain visible to the pageserver after restart.
wait_until_pageserver_has_uploaded(env, last_flush_lsns)
env.pageserver.restart(immediate=immediate_shutdown)
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# Catching up with WAL ingest should have resulted in zero bytes of ephemeral layers, since
# we froze, flushed and uploaded everything before restarting. There can be no more WAL writes
# because we shut down compute endpoints before flushing.
assert get_dirty_bytes() == 0
total_wal_ingested_after_restart = wait_for_wal_ingest_metric(ps_http_client)
log.info(f"WAL ingested before restart: {total_wal_ingested_before_restart}")
log.info(f"WAL ingested after restart: {total_wal_ingested_after_restart}")
leeway = total_wal_ingested_before_restart * 5 / 100
assert total_wal_ingested_after_restart <= leeway
assert total_wal_ingested_after_restart == 0

View File

@@ -15,6 +15,13 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.is_testing_enabled_or_skip()
# We expect the pageserver to exit, which will cause storage storage controller
# requests to fail and warn.
env.storage_controller.allowed_errors.append(".*management API still failed.*")
env.storage_controller.allowed_errors.append(
".*Reconcile error.*error sending request for url.*"
)
# Create a branch for us
env.neon_cli.create_branch("test_pageserver_recovery", "main")

View File

@@ -838,7 +838,7 @@ def test_compaction_waits_for_upload(
# upload_stuck_layers and the original initdb L0
client.timeline_checkpoint(tenant_id, timeline_id)
# as uploads are paused, the the upload_stuck_layers should still be with us
# as uploads are paused, the upload_stuck_layers should still be with us
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert path.exists(), "uploads are stuck still over compaction"

View File

@@ -1,7 +1,9 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
@pytest.mark.xfail
def test_replication_start(neon_simple_env: NeonEnv):
env = neon_simple_env

View File

@@ -874,11 +874,17 @@ def test_sharding_split_failures(
workload.validate()
if failure.expect_available():
# Even though the split failed partway through, this should not have interrupted
# clients. Disable waiting for pageservers in the workload helper, because our
# failpoints may prevent API access.
# This only applies for failure modes that leave pageserver page_service API available.
workload.churn_rows(10, upload=False, ingest=False)
# Even though the split failed partway through, this should not leave the tenant in
# an unavailable state.
# - Disable waiting for pageservers in the workload helper, because our
# failpoints may prevent API access. This only applies for failure modes that
# leave pageserver page_service API available.
# - This is a wait_until because clients may see transient errors in some split error cases,
# e.g. while waiting for a storage controller to re-attach a parent shard if we failed
# inside the pageserver and the storage controller responds by detaching children and attaching
# parents concurrently (https://github.com/neondatabase/neon/issues/7148)
wait_until(10, 1, lambda: workload.churn_rows(10, upload=False, ingest=False)) # type: ignore
workload.validate()
if failure.fails_forward(env):

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "90078947229aa7f9ac5f7ed4527b2c7386d5332b",
"postgres-v15": "80cef885add1af6741aa31944c7d2c84d8f9098f",
"postgres-v14": "3b09894ddb8825b50c963942059eab1a2a0b0a89"
"postgres-v16": "3946b2e2ea71d07af092099cb5bcae76a69b90d6",
"postgres-v15": "e7651e79c0c27fbddc3c724f5b9553222c28e395",
"postgres-v14": "748643b4683e9fe3b105011a6ba8a687d032cd65"
}