mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
Compare commits
25 Commits
problame/o
...
test_recon
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e21b2eb7dc | ||
|
|
1cd71d29ac | ||
|
|
86762ba313 | ||
|
|
6a42b6b87f | ||
|
|
48013845f0 | ||
|
|
40226ce7da | ||
|
|
ea2083653c | ||
|
|
ad072de420 | ||
|
|
6c18109734 | ||
|
|
5dee58f492 | ||
|
|
6313f1fa7a | ||
|
|
f72415e1fd | ||
|
|
d837ce0686 | ||
|
|
2713142308 | ||
|
|
a6c1fdcaf6 | ||
|
|
adb0526262 | ||
|
|
0099dfa56b | ||
|
|
3a4ebfb95d | ||
|
|
3220f830b7 | ||
|
|
72103d481d | ||
|
|
643683f41a | ||
|
|
35f4c04c9b | ||
|
|
1787cf19e3 | ||
|
|
2668a1dfab | ||
|
|
77f3a30440 |
21
.github/workflows/build_and_test.yml
vendored
21
.github/workflows/build_and_test.yml
vendored
@@ -1121,10 +1121,16 @@ jobs:
|
||||
run: |
|
||||
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
|
||||
|
||||
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
|
||||
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
|
||||
-f deployPgSniRouter=false \
|
||||
-f deployProxy=false \
|
||||
-f deployStorage=true \
|
||||
-f deployStorageBroker=true \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}} \
|
||||
-f deployPreprodRegion=true
|
||||
|
||||
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=false \
|
||||
-f deployProxy=false \
|
||||
@@ -1133,6 +1139,15 @@ jobs:
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}}
|
||||
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
|
||||
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
-f deployStorage=false \
|
||||
-f deployStorageBroker=false \
|
||||
-f branch=main \
|
||||
-f dockerTag=${{needs.tag.outputs.build-tag}} \
|
||||
-f deployPreprodRegion=true
|
||||
|
||||
gh workflow --repo neondatabase/aws run deploy-proxy-prod.yml --ref main \
|
||||
-f deployPgSniRouter=true \
|
||||
-f deployProxy=true \
|
||||
|
||||
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -276,7 +276,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"aws-config",
|
||||
"aws-sdk-secretsmanager",
|
||||
"bytes",
|
||||
"camino",
|
||||
"clap",
|
||||
@@ -433,29 +432,6 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-secretsmanager"
|
||||
version = "1.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a0b64e61e7d632d9df90a2e0f32630c68c24960cab1d27d848718180af883d3"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-http",
|
||||
"aws-smithy-json",
|
||||
"aws-smithy-runtime",
|
||||
"aws-smithy-runtime-api",
|
||||
"aws-smithy-types",
|
||||
"aws-types",
|
||||
"bytes",
|
||||
"fastrand 2.0.0",
|
||||
"http 0.2.9",
|
||||
"once_cell",
|
||||
"regex-lite",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sso"
|
||||
version = "1.12.0"
|
||||
@@ -4237,6 +4213,7 @@ dependencies = [
|
||||
"consumption_metrics",
|
||||
"dashmap",
|
||||
"env_logger",
|
||||
"fallible-iterator",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hashbrown 0.13.2",
|
||||
|
||||
@@ -52,7 +52,6 @@ async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
aws-config = { version = "1.1.4", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "1.14"
|
||||
aws-sdk-secretsmanager = { version = "1.14.0" }
|
||||
aws-sdk-iam = "1.15.0"
|
||||
aws-smithy-async = { version = "1.1.4", default-features = false, features=["rt-tokio"] }
|
||||
aws-smithy-types = "1.1.4"
|
||||
@@ -79,6 +78,7 @@ either = "1.8"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
fail = "0.5.0"
|
||||
fallible-iterator = "0.2"
|
||||
fs2 = "0.4.3"
|
||||
futures = "0.3"
|
||||
futures-core = "0.3"
|
||||
|
||||
@@ -16,7 +16,6 @@ testing = []
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
aws-config.workspace = true
|
||||
aws-sdk-secretsmanager.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
|
||||
@@ -139,7 +139,7 @@ impl HeartbeaterTask {
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_utilization().await },
|
||||
&jwt_token,
|
||||
2,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(1),
|
||||
&cancel,
|
||||
|
||||
@@ -3,7 +3,6 @@ use attachment_service::http::make_router;
|
||||
use attachment_service::metrics::preinitialize_metrics;
|
||||
use attachment_service::persistence::Persistence;
|
||||
use attachment_service::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT};
|
||||
use aws_config::{BehaviorVersion, Region};
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::Parser;
|
||||
use diesel::Connection;
|
||||
@@ -55,11 +54,31 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
database_url: Option<String>,
|
||||
|
||||
/// Flag to enable dev mode, which permits running without auth
|
||||
#[arg(long, default_value = "false")]
|
||||
dev: bool,
|
||||
|
||||
/// Grace period before marking unresponsive pageserver offline
|
||||
#[arg(long)]
|
||||
max_unavailable_interval: Option<humantime::Duration>,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
/// In strict mode, we will require that all secrets are loaded, i.e. security features
|
||||
/// may not be implicitly turned off by omitting secrets in the environment.
|
||||
Strict,
|
||||
/// In dev mode, secrets are optional, and omitting a particular secret will implicitly
|
||||
/// disable the auth related to it (e.g. no pageserver jwt key -> send unauthenticated
|
||||
/// requests, no public key -> don't authenticate incoming requests).
|
||||
Dev,
|
||||
}
|
||||
|
||||
impl Default for StrictMode {
|
||||
fn default() -> Self {
|
||||
Self::Strict
|
||||
}
|
||||
}
|
||||
|
||||
/// Secrets may either be provided on the command line (for testing), or loaded from AWS SecretManager: this
|
||||
/// type encapsulates the logic to decide which and do the loading.
|
||||
struct Secrets {
|
||||
@@ -70,13 +89,6 @@ struct Secrets {
|
||||
}
|
||||
|
||||
impl Secrets {
|
||||
const DATABASE_URL_SECRET: &'static str = "rds-neon-storage-controller-url";
|
||||
const PAGESERVER_JWT_TOKEN_SECRET: &'static str =
|
||||
"neon-storage-controller-pageserver-jwt-token";
|
||||
const CONTROL_PLANE_JWT_TOKEN_SECRET: &'static str =
|
||||
"neon-storage-controller-control-plane-jwt-token";
|
||||
const PUBLIC_KEY_SECRET: &'static str = "neon-storage-controller-public-key";
|
||||
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
@@ -87,111 +99,41 @@ impl Secrets {
|
||||
/// - Environment variables if DATABASE_URL is set.
|
||||
/// - AWS Secrets Manager secrets
|
||||
async fn load(args: &Cli) -> anyhow::Result<Self> {
|
||||
match &args.database_url {
|
||||
Some(url) => Self::load_cli(url, args),
|
||||
None => match std::env::var(Self::DATABASE_URL_ENV) {
|
||||
Ok(database_url) => Self::load_env(database_url),
|
||||
Err(_) => Self::load_aws_sm().await,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn load_env(database_url: String) -> anyhow::Result<Self> {
|
||||
let public_key = match std::env::var(Self::PUBLIC_KEY_ENV) {
|
||||
Ok(public_key) => Some(JwtAuth::from_key(public_key).context("Loading public key")?),
|
||||
Err(_) => None,
|
||||
};
|
||||
Ok(Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token: std::env::var(Self::PAGESERVER_JWT_TOKEN_ENV).ok(),
|
||||
control_plane_jwt_token: std::env::var(Self::CONTROL_PLANE_JWT_TOKEN_ENV).ok(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn load_aws_sm() -> anyhow::Result<Self> {
|
||||
let Ok(region) = std::env::var("AWS_REGION") else {
|
||||
anyhow::bail!("AWS_REGION is not set, cannot load secrets automatically: either set this, or use CLI args to supply secrets");
|
||||
};
|
||||
let config = aws_config::defaults(BehaviorVersion::v2023_11_09())
|
||||
.region(Region::new(region.clone()))
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let asm = aws_sdk_secretsmanager::Client::new(&config);
|
||||
|
||||
let Some(database_url) = asm
|
||||
.get_secret_value()
|
||||
.secret_id(Self::DATABASE_URL_SECRET)
|
||||
.send()
|
||||
.await?
|
||||
.secret_string()
|
||||
.map(str::to_string)
|
||||
let Some(database_url) =
|
||||
Self::load_secret(&args.database_url, Self::DATABASE_URL_ENV).await
|
||||
else {
|
||||
anyhow::bail!(
|
||||
"Database URL secret not found at {region}/{}",
|
||||
Self::DATABASE_URL_SECRET
|
||||
"Database URL is not set (set `--database-url`, or `DATABASE_URL` environment)"
|
||||
)
|
||||
};
|
||||
|
||||
let jwt_token = asm
|
||||
.get_secret_value()
|
||||
.secret_id(Self::PAGESERVER_JWT_TOKEN_SECRET)
|
||||
.send()
|
||||
.await?
|
||||
.secret_string()
|
||||
.map(str::to_string);
|
||||
if jwt_token.is_none() {
|
||||
tracing::warn!("No pageserver JWT token set: this will only work if authentication is disabled on the pageserver");
|
||||
}
|
||||
|
||||
let control_plane_jwt_token = asm
|
||||
.get_secret_value()
|
||||
.secret_id(Self::CONTROL_PLANE_JWT_TOKEN_SECRET)
|
||||
.send()
|
||||
.await?
|
||||
.secret_string()
|
||||
.map(str::to_string);
|
||||
if jwt_token.is_none() {
|
||||
tracing::warn!("No control plane JWT token set: this will only work if authentication is disabled on the pageserver");
|
||||
}
|
||||
|
||||
let public_key = asm
|
||||
.get_secret_value()
|
||||
.secret_id(Self::PUBLIC_KEY_SECRET)
|
||||
.send()
|
||||
.await?
|
||||
.secret_string()
|
||||
.map(str::to_string);
|
||||
let public_key = match public_key {
|
||||
Some(key) => Some(JwtAuth::from_key(key)?),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
"No public key set: inccoming HTTP requests will not be authenticated"
|
||||
);
|
||||
None
|
||||
}
|
||||
let public_key = match Self::load_secret(&args.public_key, Self::PUBLIC_KEY_ENV).await {
|
||||
Some(v) => Some(JwtAuth::from_key(v).context("Loading public key")?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
let this = Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token,
|
||||
control_plane_jwt_token,
|
||||
})
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV).await,
|
||||
control_plane_jwt_token: Self::load_secret(
|
||||
&args.control_plane_jwt_token,
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
)
|
||||
.await,
|
||||
};
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
fn load_cli(database_url: &str, args: &Cli) -> anyhow::Result<Self> {
|
||||
let public_key = match &args.public_key {
|
||||
None => None,
|
||||
Some(key) => Some(JwtAuth::from_key(key.clone()).context("Loading public key")?),
|
||||
};
|
||||
Ok(Self {
|
||||
database_url: database_url.to_owned(),
|
||||
public_key,
|
||||
jwt_token: args.jwt_token.clone(),
|
||||
control_plane_jwt_token: args.control_plane_jwt_token.clone(),
|
||||
})
|
||||
async fn load_secret(cli: &Option<String>, env_name: &str) -> Option<String> {
|
||||
if let Some(v) = cli {
|
||||
Some(v.clone())
|
||||
} else if let Ok(v) = std::env::var(env_name) {
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,8 +189,42 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
args.listen
|
||||
);
|
||||
|
||||
let strict_mode = if args.dev {
|
||||
StrictMode::Dev
|
||||
} else {
|
||||
StrictMode::Strict
|
||||
};
|
||||
|
||||
let secrets = Secrets::load(&args).await?;
|
||||
|
||||
// Validate required secrets and arguments are provided in strict mode
|
||||
match strict_mode {
|
||||
StrictMode::Strict
|
||||
if (secrets.public_key.is_none()
|
||||
|| secrets.jwt_token.is_none()
|
||||
|| secrets.control_plane_jwt_token.is_none()) =>
|
||||
{
|
||||
// Production systems should always have secrets configured: if public_key was not set
|
||||
// then we would implicitly disable auth.
|
||||
anyhow::bail!(
|
||||
"Insecure config! One or more secrets is not set. This is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict if args.compute_hook_url.is_none() => {
|
||||
// Production systems should always have a compute hook set, to prevent falling
|
||||
// back to trying to use neon_local.
|
||||
anyhow::bail!(
|
||||
"`--compute-hook-url` is not set: this is only permitted in `--dev` mode"
|
||||
);
|
||||
}
|
||||
StrictMode::Strict => {
|
||||
tracing::info!("Starting in strict mode: configuration is OK.")
|
||||
}
|
||||
StrictMode::Dev => {
|
||||
tracing::warn!("Starting in dev mode: this may be an insecure configuration.")
|
||||
}
|
||||
}
|
||||
|
||||
let config = Config {
|
||||
jwt_token: secrets.jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
|
||||
@@ -294,7 +294,7 @@ where
|
||||
// is in state 'taken' but the thread that would unlock it is
|
||||
// not there.
|
||||
// 2. A rust object that represented some external resource in the
|
||||
// parent now got implicitly copied by the the fork, even though
|
||||
// parent now got implicitly copied by the fork, even though
|
||||
// the object's type is not `Copy`. The parent program may use
|
||||
// non-copyability as way to enforce unique ownership of an
|
||||
// external resource in the typesystem. The fork breaks that
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
//!
|
||||
//! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
|
||||
//! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
|
||||
//! the basebackup from the pageserver to initialize the the data directory, and
|
||||
//! the basebackup from the pageserver to initialize the data directory, and
|
||||
//! finally launches the PostgreSQL process. It watches the PostgreSQL process
|
||||
//! until it exits.
|
||||
//!
|
||||
|
||||
@@ -279,6 +279,7 @@ impl StorageController {
|
||||
&self.listen,
|
||||
"-p",
|
||||
self.path.as_ref(),
|
||||
"--dev",
|
||||
"--database-url",
|
||||
&database_url,
|
||||
"--max-unavailable-interval",
|
||||
|
||||
@@ -913,6 +913,8 @@ pub struct PagestreamNblocksResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetPageResponse {
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
@@ -1073,6 +1075,11 @@ impl PagestreamBeMessage {
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(Tag::GetPage as u8);
|
||||
bytes.put_u32(resp.rel.spcnode);
|
||||
bytes.put_u32(resp.rel.dbnode);
|
||||
bytes.put_u32(resp.rel.relnode);
|
||||
bytes.put_u8(resp.rel.forknum);
|
||||
bytes.put_u32(resp.blkno);
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
@@ -1114,9 +1121,20 @@ impl PagestreamBeMessage {
|
||||
Self::Nblocks(PagestreamNblocksResponse { n_blocks })
|
||||
}
|
||||
Tag::GetPage => {
|
||||
let rel = RelTag {
|
||||
spcnode: buf.read_u32::<BigEndian>()?,
|
||||
dbnode: buf.read_u32::<BigEndian>()?,
|
||||
relnode: buf.read_u32::<BigEndian>()?,
|
||||
forknum: buf.read_u8()?,
|
||||
};
|
||||
let blkno = buf.read_u32::<BigEndian>()?;
|
||||
let mut page = vec![0; 8192]; // TODO: use MaybeUninit
|
||||
buf.read_exact(&mut page)?;
|
||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() })
|
||||
PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
rel,
|
||||
blkno,
|
||||
page: page.into(),
|
||||
})
|
||||
}
|
||||
Tag::Error => {
|
||||
let mut msg = Vec::new();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::*;
|
||||
use clap::{value_parser, Arg, ArgMatches, Command};
|
||||
use postgres::Client;
|
||||
use std::{path::PathBuf, str::FromStr};
|
||||
use wal_craft::*;
|
||||
|
||||
@@ -8,8 +9,8 @@ fn main() -> Result<()> {
|
||||
.init();
|
||||
let arg_matches = cli().get_matches();
|
||||
|
||||
let wal_craft = |arg_matches: &ArgMatches, client| {
|
||||
let (intermediate_lsns, end_of_wal_lsn) = match arg_matches
|
||||
let wal_craft = |arg_matches: &ArgMatches, client: &mut Client| {
|
||||
let intermediate_lsns = match arg_matches
|
||||
.get_one::<String>("type")
|
||||
.map(|s| s.as_str())
|
||||
.context("'type' is required")?
|
||||
@@ -25,6 +26,7 @@ fn main() -> Result<()> {
|
||||
LastWalRecordCrossingSegment::NAME => LastWalRecordCrossingSegment::craft(client)?,
|
||||
a => panic!("Unknown --type argument: {a}"),
|
||||
};
|
||||
let end_of_wal_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
for lsn in intermediate_lsns {
|
||||
println!("intermediate_lsn = {lsn}");
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ use postgres::types::PgLsn;
|
||||
use postgres::Client;
|
||||
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
||||
use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
|
||||
use std::cmp::Ordering;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Command;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -232,59 +231,52 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow
|
||||
pub trait Crafter {
|
||||
const NAME: &'static str;
|
||||
|
||||
/// Generates WAL using the client `client`. Returns a pair of:
|
||||
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
|
||||
/// May include or exclude Lsn(0) and the end-of-wal.
|
||||
/// * The expected end-of-wal LSN.
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)>;
|
||||
/// Generates WAL using the client `client`. Returns a vector of some valid
|
||||
/// "interesting" intermediate LSNs which one may start reading from.
|
||||
/// test_end_of_wal uses this to check various starting points.
|
||||
///
|
||||
/// Note that postgres is generally keen about writing some WAL. While we
|
||||
/// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
|
||||
/// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
|
||||
/// stable WAL end would be flaky unless postgres is shut down. For this
|
||||
/// reason returning potential end of WAL here is pointless. Most of the
|
||||
/// time this doesn't happen though, so it is reasonable to create needed
|
||||
/// WAL structure and immediately kill postgres like test_end_of_wal does.
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
|
||||
}
|
||||
|
||||
/// Wraps some WAL craft function, providing current LSN to it before the
|
||||
/// insertion and flushing WAL afterwards. Also pushes initial LSN to the
|
||||
/// result.
|
||||
fn craft_internal<C: postgres::GenericClient>(
|
||||
client: &mut C,
|
||||
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec<PgLsn>, Option<PgLsn>)>,
|
||||
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
|
||||
) -> anyhow::Result<Vec<PgLsn>> {
|
||||
ensure_server_config(client)?;
|
||||
|
||||
let initial_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
info!("LSN initial = {}", initial_lsn);
|
||||
|
||||
let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
|
||||
let last_lsn = match last_lsn {
|
||||
None => client.pg_current_wal_insert_lsn()?,
|
||||
Some(last_lsn) => {
|
||||
let insert_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
match last_lsn.cmp(&insert_lsn) {
|
||||
Ordering::Less => bail!(
|
||||
"Some records were inserted after the crafted WAL: {} vs {}",
|
||||
last_lsn,
|
||||
insert_lsn
|
||||
),
|
||||
Ordering::Equal => last_lsn,
|
||||
Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
|
||||
}
|
||||
}
|
||||
};
|
||||
let mut intermediate_lsns = f(client, initial_lsn)?;
|
||||
if !intermediate_lsns.starts_with(&[initial_lsn]) {
|
||||
intermediate_lsns.insert(0, initial_lsn);
|
||||
}
|
||||
|
||||
// Some records may be not flushed, e.g. non-transactional logical messages.
|
||||
//
|
||||
// Note: this is broken if pg_current_wal_insert_lsn is at page boundary
|
||||
// because pg_current_wal_insert_lsn skips page headers.
|
||||
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
|
||||
match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
|
||||
Ordering::Less => bail!("Some records were flushed after the crafted WAL"),
|
||||
Ordering::Equal => {}
|
||||
Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
|
||||
}
|
||||
Ok((intermediate_lsns, last_lsn))
|
||||
Ok(intermediate_lsns)
|
||||
}
|
||||
|
||||
pub struct Simple;
|
||||
impl Crafter for Simple {
|
||||
const NAME: &'static str = "simple";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
||||
craft_internal(client, |client, _| {
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
Ok((Vec::new(), None))
|
||||
Ok(Vec::new())
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -292,29 +284,36 @@ impl Crafter for Simple {
|
||||
pub struct LastWalRecordXlogSwitch;
|
||||
impl Crafter for LastWalRecordXlogSwitch {
|
||||
const NAME: &'static str = "last_wal_record_xlog_switch";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
||||
// Do not use craft_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
// pg_switch_wal returns end of last record of the switched segment,
|
||||
// i.e. end of SWITCH itself.
|
||||
let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let before_xlog_switch_u64 = u64::from(before_xlog_switch);
|
||||
let next_segment = PgLsn::from(
|
||||
before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
|
||||
+ WAL_SEGMENT_SIZE as u64,
|
||||
);
|
||||
ensure!(
|
||||
after_xlog_switch <= next_segment,
|
||||
"XLOG_SWITCH message ended after the expected segment boundary: {} > {}",
|
||||
after_xlog_switch,
|
||||
xlog_switch_record_end <= next_segment,
|
||||
"XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
|
||||
xlog_switch_record_end,
|
||||
next_segment
|
||||
);
|
||||
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
|
||||
Ok(vec![before_xlog_switch, xlog_switch_record_end])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
|
||||
/// Craft xlog SWITCH record ending at page boundary.
|
||||
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
@@ -361,28 +360,29 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
|
||||
// Emit the XLOG_SWITCH
|
||||
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
ensure!(
|
||||
after_xlog_switch < next_segment,
|
||||
"XLOG_SWITCH message ended on or after the expected segment boundary: {} > {}",
|
||||
after_xlog_switch,
|
||||
xlog_switch_record_end < next_segment,
|
||||
"XLOG_SWITCH record ended on or after the expected segment boundary: {} > {}",
|
||||
xlog_switch_record_end,
|
||||
next_segment
|
||||
);
|
||||
ensure!(
|
||||
u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ == XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
"XLOG_SWITCH message ended not on page boundary: {}, offset = {}",
|
||||
after_xlog_switch,
|
||||
u64::from(after_xlog_switch) as usize % XLOG_BLCKSZ
|
||||
xlog_switch_record_end,
|
||||
u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
|
||||
);
|
||||
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
|
||||
Ok(vec![before_xlog_switch, xlog_switch_record_end])
|
||||
}
|
||||
}
|
||||
|
||||
fn craft_single_logical_message(
|
||||
/// Write ~16MB logical message; it should cross WAL segment.
|
||||
fn craft_seg_size_logical_message(
|
||||
client: &mut impl postgres::GenericClient,
|
||||
transactional: bool,
|
||||
) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
) -> anyhow::Result<Vec<PgLsn>> {
|
||||
craft_internal(client, |client, initial_lsn| {
|
||||
ensure!(
|
||||
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
|
||||
@@ -405,34 +405,24 @@ fn craft_single_logical_message(
|
||||
"Logical message crossed two segments"
|
||||
);
|
||||
|
||||
if transactional {
|
||||
// Transactional logical messages are part of a transaction, so the one above is
|
||||
// followed by a small COMMIT record.
|
||||
|
||||
let after_message_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
ensure!(
|
||||
message_lsn < after_message_lsn,
|
||||
"No record found after the emitted message"
|
||||
);
|
||||
Ok((vec![message_lsn], Some(after_message_lsn)))
|
||||
} else {
|
||||
Ok((Vec::new(), Some(message_lsn)))
|
||||
}
|
||||
Ok(vec![message_lsn])
|
||||
})
|
||||
}
|
||||
|
||||
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
|
||||
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
|
||||
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
craft_single_logical_message(client, true)
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
||||
// Transactional message crossing WAL segment will be followed by small
|
||||
// commit record.
|
||||
craft_seg_size_logical_message(client, true)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LastWalRecordCrossingSegment;
|
||||
impl Crafter for LastWalRecordCrossingSegment {
|
||||
const NAME: &'static str = "last_wal_record_crossing_segment";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec<PgLsn>, PgLsn)> {
|
||||
craft_single_logical_message(client, false)
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
||||
craft_seg_size_logical_message(client, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,13 +11,15 @@ use utils::const_assert;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
fn init_logging() {
|
||||
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(
|
||||
format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"),
|
||||
))
|
||||
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(format!(
|
||||
"crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"
|
||||
)))
|
||||
.is_test(true)
|
||||
.try_init();
|
||||
}
|
||||
|
||||
/// Test that find_end_of_wal returns the same results as pg_dump on various
|
||||
/// WALs created by Crafter.
|
||||
fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
use crate::*;
|
||||
|
||||
@@ -38,13 +40,13 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
}
|
||||
cfg.initdb().unwrap();
|
||||
let srv = cfg.start_server().unwrap();
|
||||
let (intermediate_lsns, expected_end_of_wal_partial) =
|
||||
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
|
||||
let intermediate_lsns = C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
|
||||
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
|
||||
.iter()
|
||||
.map(|&lsn| u64::from(lsn).into())
|
||||
.collect();
|
||||
let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into();
|
||||
// Kill postgres. Note that it might have inserted to WAL something after
|
||||
// 'craft' did its job.
|
||||
srv.kill();
|
||||
|
||||
// Check find_end_of_wal on the initial WAL
|
||||
@@ -56,7 +58,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
.filter(|fname| IsXLogFileName(fname))
|
||||
.max()
|
||||
.unwrap();
|
||||
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal);
|
||||
let expected_end_of_wal = find_pg_waldump_end_of_wal(&cfg, &last_segment);
|
||||
for start_lsn in intermediate_lsns
|
||||
.iter()
|
||||
.chain(std::iter::once(&expected_end_of_wal))
|
||||
@@ -91,11 +93,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_pg_waldump_end_of_wal(
|
||||
cfg: &crate::Conf,
|
||||
last_segment: &str,
|
||||
expected_end_of_wal: Lsn,
|
||||
) {
|
||||
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
|
||||
// Get the actual end of WAL by pg_waldump
|
||||
let waldump_output = cfg
|
||||
.pg_waldump("000000010000000000000001", last_segment)
|
||||
@@ -113,11 +111,8 @@ fn check_pg_waldump_end_of_wal(
|
||||
}
|
||||
};
|
||||
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
|
||||
info!(
|
||||
"waldump erred on {}, expected wal end at {}",
|
||||
waldump_wal_end, expected_end_of_wal
|
||||
);
|
||||
assert_eq!(waldump_wal_end, expected_end_of_wal);
|
||||
info!("waldump erred on {}", waldump_wal_end);
|
||||
waldump_wal_end
|
||||
}
|
||||
|
||||
fn check_end_of_wal(
|
||||
@@ -210,9 +205,9 @@ pub fn test_update_next_xid() {
|
||||
#[test]
|
||||
pub fn test_encode_logical_message() {
|
||||
let expected = [
|
||||
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
|
||||
38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
|
||||
101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
|
||||
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
|
||||
105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
|
||||
];
|
||||
let actual = encode_logical_message("prefix", "message");
|
||||
assert_eq!(expected, actual[..]);
|
||||
|
||||
@@ -247,7 +247,7 @@ fn scenario_4() {
|
||||
//
|
||||
// This is in total 5000 + 1000 + 5000 + 1000 = 12000
|
||||
//
|
||||
// (If we used the the method from the previous scenario, and
|
||||
// (If we used the method from the previous scenario, and
|
||||
// kept only snapshot at the branch point, we'd need to keep
|
||||
// all the WAL between 10000-18000 on the main branch, so
|
||||
// the total size would be 5000 + 1000 + 8000 = 14000. The
|
||||
|
||||
@@ -69,7 +69,7 @@ pub struct Config {
|
||||
/// should be removed once we have a better solution there.
|
||||
sys_buffer_bytes: u64,
|
||||
|
||||
/// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in
|
||||
/// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
|
||||
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
|
||||
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
|
||||
/// threshold.
|
||||
|
||||
@@ -615,6 +615,7 @@ fn start_pageserver(
|
||||
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
&conf.metric_collection_bucket,
|
||||
conf.metric_collection_interval,
|
||||
conf.cached_metric_collection_interval,
|
||||
conf.synthetic_size_calculation_interval,
|
||||
|
||||
@@ -234,6 +234,7 @@ pub struct PageServerConf {
|
||||
// How often to send unchanged cached metrics to the metrics endpoint.
|
||||
pub cached_metric_collection_interval: Duration,
|
||||
pub metric_collection_endpoint: Option<Url>,
|
||||
pub metric_collection_bucket: Option<RemoteStorageConfig>,
|
||||
pub synthetic_size_calculation_interval: Duration,
|
||||
|
||||
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
|
||||
@@ -373,6 +374,7 @@ struct PageServerConfigBuilder {
|
||||
cached_metric_collection_interval: BuilderValue<Duration>,
|
||||
metric_collection_endpoint: BuilderValue<Option<Url>>,
|
||||
synthetic_size_calculation_interval: BuilderValue<Duration>,
|
||||
metric_collection_bucket: BuilderValue<Option<RemoteStorageConfig>>,
|
||||
|
||||
disk_usage_based_eviction: BuilderValue<Option<DiskUsageEvictionTaskConfig>>,
|
||||
|
||||
@@ -455,6 +457,8 @@ impl PageServerConfigBuilder {
|
||||
.expect("cannot parse default synthetic size calculation interval")),
|
||||
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
|
||||
|
||||
metric_collection_bucket: Set(None),
|
||||
|
||||
disk_usage_based_eviction: Set(None),
|
||||
|
||||
test_remote_failures: Set(0),
|
||||
@@ -586,6 +590,13 @@ impl PageServerConfigBuilder {
|
||||
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
|
||||
}
|
||||
|
||||
pub fn metric_collection_bucket(
|
||||
&mut self,
|
||||
metric_collection_bucket: Option<RemoteStorageConfig>,
|
||||
) {
|
||||
self.metric_collection_bucket = BuilderValue::Set(metric_collection_bucket)
|
||||
}
|
||||
|
||||
pub fn synthetic_size_calculation_interval(
|
||||
&mut self,
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
@@ -694,6 +705,7 @@ impl PageServerConfigBuilder {
|
||||
metric_collection_interval,
|
||||
cached_metric_collection_interval,
|
||||
metric_collection_endpoint,
|
||||
metric_collection_bucket,
|
||||
synthetic_size_calculation_interval,
|
||||
disk_usage_based_eviction,
|
||||
test_remote_failures,
|
||||
@@ -942,6 +954,9 @@ impl PageServerConf {
|
||||
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
|
||||
builder.metric_collection_endpoint(Some(endpoint));
|
||||
},
|
||||
"metric_collection_bucket" => {
|
||||
builder.metric_collection_bucket(RemoteStorageConfig::from_toml(item)?)
|
||||
}
|
||||
"synthetic_size_calculation_interval" =>
|
||||
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
|
||||
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
|
||||
@@ -1057,6 +1072,7 @@ impl PageServerConf {
|
||||
metric_collection_interval: Duration::from_secs(60),
|
||||
cached_metric_collection_interval: Duration::from_secs(60 * 60),
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
metric_collection_bucket: None,
|
||||
synthetic_size_calculation_interval: Duration::from_secs(60),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
@@ -1289,6 +1305,7 @@ background_task_maximum_delay = '334 s'
|
||||
defaults::DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL
|
||||
)?,
|
||||
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
|
||||
metric_collection_bucket: None,
|
||||
synthetic_size_calculation_interval: humantime::parse_duration(
|
||||
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
|
||||
)?,
|
||||
@@ -1363,6 +1380,7 @@ background_task_maximum_delay = '334 s'
|
||||
metric_collection_interval: Duration::from_secs(222),
|
||||
cached_metric_collection_interval: Duration::from_secs(22200),
|
||||
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
|
||||
metric_collection_bucket: None,
|
||||
synthetic_size_calculation_interval: Duration::from_secs(333),
|
||||
disk_usage_based_eviction: None,
|
||||
test_remote_failures: 0,
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tena
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
|
||||
use reqwest::Url;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
@@ -41,6 +42,7 @@ type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn collect_metrics(
|
||||
metric_collection_endpoint: &Url,
|
||||
metric_collection_bucket: &Option<RemoteStorageConfig>,
|
||||
metric_collection_interval: Duration,
|
||||
_cached_metric_collection_interval: Duration,
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
@@ -94,6 +96,20 @@ pub async fn collect_metrics(
|
||||
.build()
|
||||
.expect("Failed to create http client with timeout");
|
||||
|
||||
let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
|
||||
match GenericRemoteStorage::from_config(bucket_config) {
|
||||
Ok(client) => Some(client),
|
||||
Err(e) => {
|
||||
// Non-fatal error: if we were given an invalid config, we will proceed
|
||||
// with sending metrics over the network, but not to S3.
|
||||
tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let node_id = node_id.to_string();
|
||||
|
||||
loop {
|
||||
@@ -118,10 +134,18 @@ pub async fn collect_metrics(
|
||||
tracing::error!("failed to persist metrics to {path:?}: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(bucket_client) = &bucket_client {
|
||||
let res =
|
||||
upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await;
|
||||
if let Err(e) = res {
|
||||
tracing::error!("failed to upload to S3: {e:#}");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let upload = async {
|
||||
let res = upload::upload_metrics(
|
||||
let res = upload::upload_metrics_http(
|
||||
&client,
|
||||
metric_collection_endpoint,
|
||||
&cancel,
|
||||
@@ -132,7 +156,7 @@ pub async fn collect_metrics(
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
// serialization error which should never happen
|
||||
tracing::error!("failed to upload due to {e:#}");
|
||||
tracing::error!("failed to upload via HTTP due to {e:#}");
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
use std::time::SystemTime;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
|
||||
@@ -13,8 +18,9 @@ struct Ids {
|
||||
pub(super) timeline_id: Option<TimelineId>,
|
||||
}
|
||||
|
||||
/// Serialize and write metrics to an HTTP endpoint
|
||||
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
|
||||
pub(super) async fn upload_metrics(
|
||||
pub(super) async fn upload_metrics_http(
|
||||
client: &reqwest::Client,
|
||||
metric_collection_endpoint: &reqwest::Url,
|
||||
cancel: &CancellationToken,
|
||||
@@ -74,6 +80,60 @@ pub(super) async fn upload_metrics(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Serialize and write metrics to a remote storage object
|
||||
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
|
||||
pub(super) async fn upload_metrics_bucket(
|
||||
client: &GenericRemoteStorage,
|
||||
cancel: &CancellationToken,
|
||||
node_id: &str,
|
||||
metrics: &[RawMetric],
|
||||
) -> anyhow::Result<()> {
|
||||
if metrics.is_empty() {
|
||||
// Skip uploads if we have no metrics, so that readers don't have to handle the edge case
|
||||
// of an empty object.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Compose object path
|
||||
let datetime: DateTime<Utc> = SystemTime::now().into();
|
||||
let ts_prefix = datetime.format("year=%Y/month=%m/day=%d/%H:%M:%SZ");
|
||||
let path = RemotePath::from_string(&format!("{ts_prefix}_{node_id}.ndjson.gz"))?;
|
||||
|
||||
// Set up a gzip writer into a buffer
|
||||
let mut compressed_bytes: Vec<u8> = Vec::new();
|
||||
let compressed_writer = std::io::Cursor::new(&mut compressed_bytes);
|
||||
let mut gzip_writer = async_compression::tokio::write::GzipEncoder::new(compressed_writer);
|
||||
|
||||
// Serialize and write into compressed buffer
|
||||
let started_at = std::time::Instant::now();
|
||||
for res in serialize_in_chunks(CHUNK_SIZE, metrics, node_id) {
|
||||
let (_chunk, body) = res?;
|
||||
gzip_writer.write_all(&body).await?;
|
||||
}
|
||||
gzip_writer.flush().await?;
|
||||
gzip_writer.shutdown().await?;
|
||||
let compressed_length = compressed_bytes.len();
|
||||
|
||||
// Write to remote storage
|
||||
client
|
||||
.upload_storage_object(
|
||||
futures::stream::once(futures::future::ready(Ok(compressed_bytes.into()))),
|
||||
compressed_length,
|
||||
&path,
|
||||
cancel,
|
||||
)
|
||||
.await?;
|
||||
let elapsed = started_at.elapsed();
|
||||
|
||||
tracing::info!(
|
||||
compressed_length,
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"write metrics bucket at {path}",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The return type is quite ugly, but we gain testability in isolation
|
||||
fn serialize_in_chunks<'a, F>(
|
||||
chunk_size: usize,
|
||||
|
||||
@@ -435,7 +435,7 @@ pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(||
|
||||
static REMOTE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_remote_physical_size",
|
||||
"The size of the layer files present in the remote storage that are listed in the the remote index_part.json.",
|
||||
"The size of the layer files present in the remote storage that are listed in the remote index_part.json.",
|
||||
// Corollary: If any files are missing from the index part, they won't be included here.
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
@@ -699,6 +699,14 @@ pub static STARTUP_IS_LOADING: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
.expect("Failed to register pageserver_startup_is_loading")
|
||||
});
|
||||
|
||||
pub(crate) static TIMELINE_EPHEMERAL_BYTES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_timeline_ephemeral_bytes",
|
||||
"Total number of bytes in ephemeral layers, summed for all timelines. Approximate, lazily updated."
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
|
||||
/// like how long it took to load.
|
||||
///
|
||||
|
||||
@@ -1156,6 +1156,8 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
page,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -2141,7 +2141,7 @@ impl Tenant {
|
||||
|
||||
// Shut down the timeline's remote client: this means that the indices we write
|
||||
// for child shards will not be invalidated by the parent shard deleting layers.
|
||||
tl_client.shutdown().await?;
|
||||
tl_client.shutdown().await;
|
||||
|
||||
// Download methods can still be used after shutdown, as they don't flow through the remote client's
|
||||
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
|
||||
|
||||
@@ -217,7 +217,7 @@ use crate::task_mgr::shutdown_token;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::download::download_retry;
|
||||
use crate::tenant::storage_layer::AsLayerDesc;
|
||||
use crate::tenant::upload_queue::Delete;
|
||||
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
|
||||
use crate::tenant::TIMELINES_SEGMENT_NAME;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
@@ -266,15 +266,6 @@ pub enum MaybeDeletedIndexPart {
|
||||
Deleted(IndexPart),
|
||||
}
|
||||
|
||||
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum StopError {
|
||||
/// Returned if the upload queue was never initialized.
|
||||
/// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
|
||||
#[error("queue is not initialized")]
|
||||
QueueUninitialized,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum PersistIndexPartWithDeletedFlagError {
|
||||
#[error("another task is already setting the deleted_flag, started at {0:?}")]
|
||||
@@ -399,15 +390,10 @@ impl RemoteTimelineClient {
|
||||
"bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
|
||||
))?;
|
||||
|
||||
{
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part)?;
|
||||
self.update_remote_physical_size_gauge(Some(index_part));
|
||||
}
|
||||
// also locks upload queue, without dropping the guard above it will be a deadlock
|
||||
self.stop().expect("initialized line above");
|
||||
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part)?;
|
||||
self.update_remote_physical_size_gauge(Some(index_part));
|
||||
self.stop_impl(&mut upload_queue);
|
||||
|
||||
upload_queue
|
||||
.stopped_mut()
|
||||
@@ -421,7 +407,8 @@ impl RemoteTimelineClient {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
|
||||
UploadQueue::Stopped(q) => q
|
||||
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
|
||||
UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
|
||||
.upload_queue_for_deletion
|
||||
.get_last_remote_consistent_lsn_projected(),
|
||||
}
|
||||
@@ -431,7 +418,8 @@ impl RemoteTimelineClient {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
UploadQueue::Uninitialized => None,
|
||||
UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
|
||||
UploadQueue::Stopped(q) => Some(
|
||||
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
|
||||
UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
|
||||
q.upload_queue_for_deletion
|
||||
.get_last_remote_consistent_lsn_visible(),
|
||||
),
|
||||
@@ -898,7 +886,7 @@ impl RemoteTimelineClient {
|
||||
/// Wait for all previously scheduled operations to complete, and then stop.
|
||||
///
|
||||
/// Not cancellation safe
|
||||
pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
|
||||
pub(crate) async fn shutdown(self: &Arc<Self>) {
|
||||
// On cancellation the queue is left in ackward state of refusing new operations but
|
||||
// proper stop is yet to be called. On cancel the original or some later task must call
|
||||
// `stop` or `shutdown`.
|
||||
@@ -909,8 +897,12 @@ impl RemoteTimelineClient {
|
||||
let fut = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = match &mut *guard {
|
||||
UploadQueue::Stopped(_) => return Ok(()),
|
||||
UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
|
||||
UploadQueue::Stopped(_) => return,
|
||||
UploadQueue::Uninitialized => {
|
||||
// transition into Stopped state
|
||||
self.stop_impl(&mut guard);
|
||||
return;
|
||||
}
|
||||
UploadQueue::Initialized(ref mut init) => init,
|
||||
};
|
||||
|
||||
@@ -942,7 +934,7 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
}
|
||||
|
||||
self.stop()
|
||||
self.stop();
|
||||
}
|
||||
|
||||
/// Set the deleted_at field in the remote index file.
|
||||
@@ -1324,12 +1316,7 @@ impl RemoteTimelineClient {
|
||||
// upload finishes or times out soon enough.
|
||||
if cancel.is_cancelled() {
|
||||
info!("upload task cancelled by shutdown request");
|
||||
match self.stop() {
|
||||
Ok(()) => {}
|
||||
Err(StopError::QueueUninitialized) => {
|
||||
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
|
||||
}
|
||||
}
|
||||
self.stop();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1584,17 +1571,23 @@ impl RemoteTimelineClient {
|
||||
/// In-progress operations will still be running after this function returns.
|
||||
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
|
||||
/// to wait for them to complete, after calling this function.
|
||||
pub(crate) fn stop(&self) -> Result<(), StopError> {
|
||||
pub(crate) fn stop(&self) {
|
||||
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
|
||||
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
|
||||
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
match &mut *guard {
|
||||
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
|
||||
self.stop_impl(&mut guard);
|
||||
}
|
||||
|
||||
fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
|
||||
match &mut **guard {
|
||||
UploadQueue::Uninitialized => {
|
||||
info!("UploadQueue is in state Uninitialized, nothing to do");
|
||||
**guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
|
||||
}
|
||||
UploadQueue::Stopped(_) => {
|
||||
// nothing to do
|
||||
info!("another concurrent task already shut down the queue");
|
||||
Ok(())
|
||||
}
|
||||
UploadQueue::Initialized(initialized) => {
|
||||
info!("shutting down upload queue");
|
||||
@@ -1627,11 +1620,13 @@ impl RemoteTimelineClient {
|
||||
};
|
||||
|
||||
let upload_queue = std::mem::replace(
|
||||
&mut *guard,
|
||||
UploadQueue::Stopped(UploadQueueStopped {
|
||||
upload_queue_for_deletion,
|
||||
deleted_at: SetDeletedFlagProgress::NotRunning,
|
||||
}),
|
||||
&mut **guard,
|
||||
UploadQueue::Stopped(UploadQueueStopped::Deletable(
|
||||
UploadQueueStoppedDeletable {
|
||||
upload_queue_for_deletion,
|
||||
deleted_at: SetDeletedFlagProgress::NotRunning,
|
||||
},
|
||||
)),
|
||||
);
|
||||
if let UploadQueue::Initialized(qi) = upload_queue {
|
||||
qi
|
||||
@@ -1660,10 +1655,6 @@ impl RemoteTimelineClient {
|
||||
// which is exactly what we want to happen.
|
||||
drop(op);
|
||||
}
|
||||
|
||||
// We're done.
|
||||
drop(guard);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,12 @@ use tracing::*;
|
||||
use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap};
|
||||
// avoid binding to Write (conflicts with std::io::Write)
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use crate::metrics::TIMELINE_EPHEMERAL_BYTES;
|
||||
use std::cmp::Ordering;
|
||||
use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
|
||||
use super::{
|
||||
@@ -70,6 +74,8 @@ pub struct InMemoryLayerInner {
|
||||
/// Each serialized Value is preceded by a 'u32' length field.
|
||||
/// PerSeg::page_versions map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
|
||||
resource_units: GlobalResourceUnits,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
@@ -78,6 +84,101 @@ impl std::fmt::Debug for InMemoryLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
|
||||
/// to minimize contention.
|
||||
///
|
||||
/// This global state is used to implement behaviors that require a global view of the system, e.g.
|
||||
/// rolling layers proactively to limit the total amount of dirty data.
|
||||
struct GlobalResources {
|
||||
// How many bytes are in all EphemeralFile objects
|
||||
dirty_bytes: AtomicU64,
|
||||
// How many layers are contributing to dirty_bytes
|
||||
dirty_layers: AtomicUsize,
|
||||
}
|
||||
|
||||
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
|
||||
struct GlobalResourceUnits {
|
||||
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
|
||||
// for decrementing the global counter by this many bytes when dropped.
|
||||
dirty_bytes: u64,
|
||||
}
|
||||
|
||||
impl GlobalResourceUnits {
|
||||
// Hint for the layer append path to update us when the layer size differs from the last
|
||||
// call to update_size by this much. If we don't reach this threshold, we'll still get
|
||||
// updated when the Timeline "ticks" in the background.
|
||||
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
|
||||
|
||||
fn new() -> Self {
|
||||
GLOBAL_RESOURCES
|
||||
.dirty_layers
|
||||
.fetch_add(1, AtomicOrdering::Relaxed);
|
||||
Self { dirty_bytes: 0 }
|
||||
}
|
||||
|
||||
/// Do not call this frequently: all timelines will write to these same global atomics,
|
||||
/// so this is a relatively expensive operation. Wait at least a few seconds between calls.
|
||||
fn publish_size(&mut self, size: u64) {
|
||||
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
|
||||
Ordering::Equal => {
|
||||
return;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
let delta = size - self.dirty_bytes;
|
||||
let old = GLOBAL_RESOURCES
|
||||
.dirty_bytes
|
||||
.fetch_add(delta, AtomicOrdering::Relaxed);
|
||||
old + delta
|
||||
}
|
||||
Ordering::Less => {
|
||||
let delta = self.dirty_bytes - size;
|
||||
let old = GLOBAL_RESOURCES
|
||||
.dirty_bytes
|
||||
.fetch_sub(delta, AtomicOrdering::Relaxed);
|
||||
old - delta
|
||||
}
|
||||
};
|
||||
|
||||
// This is a sloppy update: concurrent updates to the counter will race, and the exact
|
||||
// value of the metric might not be the exact latest value of GLOBAL_RESOURCES::dirty_bytes.
|
||||
// That's okay: as long as the metric contains some recent value, it doesn't have to always
|
||||
// be literally the last update.
|
||||
TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
|
||||
|
||||
self.dirty_bytes = size;
|
||||
}
|
||||
|
||||
// Call publish_size if the input size differs from last published size by more than
|
||||
// the drift limit
|
||||
fn maybe_publish_size(&mut self, size: u64) {
|
||||
let publish = match size.cmp(&self.dirty_bytes) {
|
||||
Ordering::Equal => false,
|
||||
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
|
||||
Ordering::Less => self.dirty_bytes - size > Self::MAX_SIZE_DRIFT,
|
||||
};
|
||||
|
||||
if publish {
|
||||
self.publish_size(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for GlobalResourceUnits {
|
||||
fn drop(&mut self) {
|
||||
GLOBAL_RESOURCES
|
||||
.dirty_layers
|
||||
.fetch_sub(1, AtomicOrdering::Relaxed);
|
||||
|
||||
// Subtract our contribution to the global total dirty bytes
|
||||
self.publish_size(0);
|
||||
}
|
||||
}
|
||||
|
||||
static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
|
||||
dirty_bytes: AtomicU64::new(0),
|
||||
dirty_layers: AtomicUsize::new(0),
|
||||
};
|
||||
|
||||
impl InMemoryLayer {
|
||||
pub(crate) fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
@@ -328,6 +429,7 @@ impl InMemoryLayer {
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
index: HashMap::new(),
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -378,9 +480,18 @@ impl InMemoryLayer {
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
}
|
||||
|
||||
let size = locked_inner.file.len();
|
||||
locked_inner.resource_units.maybe_publish_size(size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tick(&self) {
|
||||
let mut inner = self.inner.write().await;
|
||||
let size = inner.file.len();
|
||||
inner.resource_units.publish_size(size);
|
||||
}
|
||||
|
||||
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
Ok(())
|
||||
|
||||
@@ -54,6 +54,7 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -64,7 +65,6 @@ use crate::{
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
tenant::storage_layer::{
|
||||
@@ -1241,11 +1241,7 @@ impl Timeline {
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
if let Err(e) = client.shutdown().await {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to flush to remote storage: {e:#}");
|
||||
}
|
||||
client.shutdown().await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1282,12 +1278,7 @@ impl Timeline {
|
||||
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
|
||||
// case our caller wants to use that for a deletion
|
||||
if let Some(remote_client) = self.remote_client.as_ref() {
|
||||
match remote_client.stop() {
|
||||
Ok(()) => {}
|
||||
Err(StopError::QueueUninitialized) => {
|
||||
// Shutting down during initialization is legal
|
||||
}
|
||||
}
|
||||
remote_client.stop();
|
||||
}
|
||||
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
@@ -2855,7 +2846,15 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, Bytes)> {
|
||||
return None;
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
async fn get_ready_ancestor_timeline(
|
||||
@@ -4461,6 +4460,9 @@ impl<'a> TimelineWriter<'a> {
|
||||
let action = self.get_open_layer_action(last_record_lsn, 0);
|
||||
if action == OpenLayerAction::Roll {
|
||||
self.roll_layer(last_record_lsn).await?;
|
||||
} else if let Some(writer_state) = &mut *self.write_guard {
|
||||
// Periodic update of statistics
|
||||
writer_state.open_layer.tick().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -16,9 +16,7 @@ use crate::{
|
||||
tenant::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{
|
||||
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
|
||||
},
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant,
|
||||
},
|
||||
};
|
||||
@@ -50,19 +48,7 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
|
||||
// Prevent new uploads from starting.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.stop();
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(e) => match e {
|
||||
remote_timeline_client::StopError::QueueUninitialized => {
|
||||
// This case shouldn't happen currently because the
|
||||
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
|
||||
// That is, before we declare the Tenant as Active.
|
||||
// But we only allow calls to delete_timeline on Active tenants.
|
||||
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
|
||||
}
|
||||
},
|
||||
}
|
||||
remote_client.stop();
|
||||
}
|
||||
|
||||
// Stop & wait for the remaining timeline tasks, including upload tasks.
|
||||
|
||||
@@ -121,11 +121,16 @@ pub(super) enum SetDeletedFlagProgress {
|
||||
Successful(NaiveDateTime),
|
||||
}
|
||||
|
||||
pub(super) struct UploadQueueStopped {
|
||||
pub(super) struct UploadQueueStoppedDeletable {
|
||||
pub(super) upload_queue_for_deletion: UploadQueueInitialized,
|
||||
pub(super) deleted_at: SetDeletedFlagProgress,
|
||||
}
|
||||
|
||||
pub(super) enum UploadQueueStopped {
|
||||
Deletable(UploadQueueStoppedDeletable),
|
||||
Uninitialized,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum NotInitialized {
|
||||
#[error("queue is in state Uninitialized")]
|
||||
@@ -249,12 +254,15 @@ impl UploadQueue {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
|
||||
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
|
||||
match self {
|
||||
UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
|
||||
anyhow::bail!("queue is in state {}", self.as_str())
|
||||
}
|
||||
UploadQueue::Stopped(stopped) => Ok(stopped),
|
||||
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
|
||||
anyhow::bail!("queue is in state Stopped(Uninitialized)")
|
||||
}
|
||||
UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -782,7 +782,7 @@ where
|
||||
}
|
||||
}
|
||||
// NB: don't use `buf.is_empty()` here; it is from the
|
||||
// `impl Deref for Slice { Target = [u8] }`; the the &[u8]
|
||||
// `impl Deref for Slice { Target = [u8] }`; the &[u8]
|
||||
// returned by it only covers the initialized portion of `buf`.
|
||||
// Whereas we're interested in ensuring that we filled the entire
|
||||
// buffer that the user passed in.
|
||||
|
||||
@@ -111,6 +111,7 @@ static PageServer page_servers[MAX_SHARDS];
|
||||
|
||||
static bool pageserver_flush(shardno_t shard_no);
|
||||
static void pageserver_disconnect(shardno_t shard_no);
|
||||
static void pageserver_disconnect_shard(shardno_t shard_no);
|
||||
|
||||
static bool
|
||||
PagestoreShmemIsValid(void)
|
||||
@@ -487,9 +488,31 @@ retry:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Reset prefetch and drop connection to the shard.
|
||||
* It also drops connection to all other shards involved in prefetch.
|
||||
*/
|
||||
static void
|
||||
pageserver_disconnect(shardno_t shard_no)
|
||||
{
|
||||
if (page_servers[shard_no].conn)
|
||||
{
|
||||
/*
|
||||
* If the connection to any pageserver is lost, we throw away the
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
* cause big problems, because connection loss is supposed to be a
|
||||
* rare event.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
pageserver_disconnect_shard(shard_no);
|
||||
}
|
||||
|
||||
/*
|
||||
* Disconnect from specified shard
|
||||
*/
|
||||
static void
|
||||
pageserver_disconnect_shard(shardno_t shard_no)
|
||||
{
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not clear
|
||||
@@ -503,14 +526,6 @@ pageserver_disconnect(shardno_t shard_no)
|
||||
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
|
||||
PQfinish(page_servers[shard_no].conn);
|
||||
page_servers[shard_no].conn = NULL;
|
||||
|
||||
/*
|
||||
* If the connection to any pageserver is lost, we throw away the
|
||||
* whole prefetch queue, even for other pageservers. It should not
|
||||
* cause big problems, because connection loss is supposed to be a
|
||||
* rare event.
|
||||
*/
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
if (page_servers[shard_no].wes != NULL)
|
||||
{
|
||||
@@ -676,7 +691,8 @@ page_server_api api =
|
||||
{
|
||||
.send = pageserver_send,
|
||||
.flush = pageserver_flush,
|
||||
.receive = pageserver_receive
|
||||
.receive = pageserver_receive,
|
||||
.disconnect = pageserver_disconnect_shard
|
||||
};
|
||||
|
||||
static bool
|
||||
|
||||
@@ -312,7 +312,7 @@ pg_cluster_size(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int64 size;
|
||||
|
||||
size = GetZenithCurrentClusterSize();
|
||||
size = GetNeonCurrentClusterSize();
|
||||
|
||||
if (size == 0)
|
||||
PG_RETURN_NULL();
|
||||
|
||||
@@ -26,6 +26,8 @@ extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
extern void SetNeonCurrentClusterSize(uint64 size);
|
||||
extern uint64 GetNeonCurrentClusterSize(void);
|
||||
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
|
||||
|
||||
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
|
||||
|
||||
@@ -139,6 +139,9 @@ typedef struct
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
NRelFileInfo rinfo;
|
||||
ForkNumber forknum;
|
||||
BlockNumber blkno;
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetPageResponse;
|
||||
|
||||
@@ -180,6 +183,7 @@ typedef struct
|
||||
bool (*send) (shardno_t shard_no, NeonRequest * request);
|
||||
NeonResponse *(*receive) (shardno_t shard_no);
|
||||
bool (*flush) (shardno_t shard_no);
|
||||
void (*disconnect) (shardno_t shard_no);
|
||||
} page_server_api;
|
||||
|
||||
extern void prefetch_on_ps_disconnect(void);
|
||||
|
||||
@@ -613,6 +613,14 @@ prefetch_on_ps_disconnect(void)
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->my_ring_index == ring_index);
|
||||
|
||||
/*
|
||||
* Drop connection to all shards which have prefetch requests.
|
||||
* It is not a problem to call disconnect multiple times on the same connection
|
||||
* because disconnect implementation in libpagestore.c will check if connection
|
||||
* is alive and do nothing of connection was already dropped.
|
||||
*/
|
||||
page_server->disconnect(slot->shard_no);
|
||||
|
||||
/* clean up the request */
|
||||
slot->status = PRFS_TAG_REMAINS;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
@@ -633,13 +641,12 @@ prefetch_on_ps_disconnect(void)
|
||||
static inline void
|
||||
prefetch_set_unused(uint64 ring_index)
|
||||
{
|
||||
PrefetchRequest *slot = GetPrfSlot(ring_index);
|
||||
PrefetchRequest *slot;
|
||||
|
||||
if (ring_index < MyPState->ring_last)
|
||||
return; /* Should already be unused */
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
slot = GetPrfSlot(ring_index);
|
||||
if (slot->status == PRFS_UNUSED)
|
||||
return;
|
||||
|
||||
@@ -798,7 +805,8 @@ Retry:
|
||||
{
|
||||
if (*force_lsn > slot->effective_request_lsn)
|
||||
{
|
||||
prefetch_wait_for(ring_index);
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
@@ -813,7 +821,8 @@ Retry:
|
||||
{
|
||||
if (*force_lsn != slot->effective_request_lsn)
|
||||
{
|
||||
prefetch_wait_for(ring_index);
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
@@ -879,7 +888,8 @@ Retry:
|
||||
{
|
||||
case PRFS_REQUESTED:
|
||||
Assert(MyPState->ring_receive == cleanup_index);
|
||||
prefetch_wait_for(cleanup_index);
|
||||
if (!prefetch_wait_for(cleanup_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(cleanup_index);
|
||||
break;
|
||||
case PRFS_RECEIVED:
|
||||
@@ -1108,6 +1118,11 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
||||
msg_resp->tag = tag;
|
||||
NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4);
|
||||
msg_resp->forknum = pq_getmsgbyte(s);
|
||||
msg_resp->blkno = pq_getmsgint(s, 4);
|
||||
/* XXX: should be varlena */
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
@@ -1831,7 +1846,7 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
reln->smgr_relpersistence == RELPERSISTENCE_PERMANENT &&
|
||||
!IsAutoVacuumWorkerProcess())
|
||||
{
|
||||
uint64 current_size = GetZenithCurrentClusterSize();
|
||||
uint64 current_size = GetNeonCurrentClusterSize();
|
||||
|
||||
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
|
||||
ereport(ERROR,
|
||||
@@ -1912,7 +1927,7 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
reln->smgr_relpersistence == RELPERSISTENCE_PERMANENT &&
|
||||
!IsAutoVacuumWorkerProcess())
|
||||
{
|
||||
uint64 current_size = GetZenithCurrentClusterSize();
|
||||
uint64 current_size = GetNeonCurrentClusterSize();
|
||||
|
||||
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
|
||||
ereport(ERROR,
|
||||
@@ -2132,6 +2147,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
Retry:
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||
|
||||
if (entry != NULL)
|
||||
@@ -2153,7 +2169,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
*/
|
||||
if (slot->status == PRFS_REQUESTED)
|
||||
{
|
||||
prefetch_wait_for(slot->my_ring_index);
|
||||
if (!prefetch_wait_for(slot->my_ring_index))
|
||||
goto Retry;
|
||||
}
|
||||
/* drop caches */
|
||||
prefetch_set_unused(slot->my_ring_index);
|
||||
@@ -2200,10 +2217,20 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ);
|
||||
{
|
||||
NeonGetPageResponse* r = (NeonGetPageResponse *) resp;
|
||||
memcpy(buffer, r->page, BLCKSZ);
|
||||
if (memcmp(&r->rinfo, &rinfo, sizeof rinfo) != 0 && forkNum != r->forknum || blkno != r->blkno)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "[shard %d] get unexpected get page resonse for block %u in rel %u/%u/%u.%u instead of block block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
slot->shard_no,
|
||||
r->blkno, RelFileInfoFmt(r->rinfo), r->forknum,
|
||||
blkno, RelFileInfoFmt(rinfo), forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn)));
|
||||
lfc_write(rinfo, forkNum, blkno, buffer);
|
||||
break;
|
||||
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
|
||||
@@ -287,6 +287,7 @@ typedef struct WalproposerShmemState
|
||||
slock_t mutex;
|
||||
term_t mineLastElectedTerm;
|
||||
pg_atomic_uint64 backpressureThrottlingTime;
|
||||
pg_atomic_uint64 currentClusterSize;
|
||||
|
||||
/* last feedback from each shard */
|
||||
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
|
||||
|
||||
@@ -282,6 +282,7 @@ WalproposerShmemInit(void)
|
||||
memset(walprop_shared, 0, WalproposerShmemSize());
|
||||
SpinLockInit(&walprop_shared->mutex);
|
||||
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
|
||||
@@ -1972,7 +1973,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
|
||||
/* Only one main shard sends non-zero currentClusterSize */
|
||||
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
|
||||
SetZenithCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
|
||||
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
|
||||
|
||||
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
|
||||
{
|
||||
@@ -2094,6 +2095,18 @@ GetLogRepRestartLSN(WalProposer *wp)
|
||||
return lrRestartLsn;
|
||||
}
|
||||
|
||||
void SetNeonCurrentClusterSize(uint64 size)
|
||||
{
|
||||
pg_atomic_write_u64(&walprop_shared->currentClusterSize, size);
|
||||
}
|
||||
|
||||
uint64 GetNeonCurrentClusterSize(void)
|
||||
{
|
||||
return pg_atomic_read_u64(&walprop_shared->currentClusterSize);
|
||||
}
|
||||
uint64 GetNeonCurrentClusterSize(void);
|
||||
|
||||
|
||||
static const walproposer_api walprop_pg = {
|
||||
.get_shmem_state = walprop_pg_get_shmem_state,
|
||||
.start_streaming = walprop_pg_start_streaming,
|
||||
|
||||
@@ -97,6 +97,7 @@ workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
camino-tempfile.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
rcgen.workspace = true
|
||||
rstest.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
|
||||
@@ -408,3 +408,228 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use postgres_protocol::{
|
||||
authentication::sasl::{ChannelBinding, ScramSha256},
|
||||
message::{backend::Message as PgMessage, frontend},
|
||||
};
|
||||
use provider::AuthSecret;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use crate::{
|
||||
auth::{ComputeUserInfoMaybeEndpoint, IpPattern},
|
||||
config::AuthenticationConfig,
|
||||
console::{
|
||||
self,
|
||||
provider::{self, CachedAllowedIps, CachedRoleSecret},
|
||||
CachedNodeInfo,
|
||||
},
|
||||
context::RequestMonitoring,
|
||||
proxy::NeonOptions,
|
||||
scram::ServerSecret,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
|
||||
use super::auth_quirks;
|
||||
|
||||
struct Auth {
|
||||
ips: Vec<IpPattern>,
|
||||
secret: AuthSecret,
|
||||
}
|
||||
|
||||
impl console::Api for Auth {
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
_user_info: &super::ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError> {
|
||||
Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone())))
|
||||
}
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
_user_info: &super::ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>
|
||||
{
|
||||
Ok((
|
||||
CachedAllowedIps::new_uncached(Arc::new(self.ips.clone())),
|
||||
Some(CachedRoleSecret::new_uncached(Some(self.secret.clone()))),
|
||||
))
|
||||
}
|
||||
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
_user_info: &super::ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
static CONFIG: &AuthenticationConfig = &AuthenticationConfig {
|
||||
scram_protocol_timeout: std::time::Duration::from_secs(5),
|
||||
};
|
||||
|
||||
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
|
||||
loop {
|
||||
r.read_buf(&mut *b).await.unwrap();
|
||||
if let Some(m) = PgMessage::parse(&mut *b).unwrap() {
|
||||
break m;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_quirks_scram() {
|
||||
let (mut client, server) = tokio::io::duplex(1024);
|
||||
let mut stream = PqStream::new(Stream::from_raw(server));
|
||||
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let api = Auth {
|
||||
ips: vec![],
|
||||
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
|
||||
};
|
||||
|
||||
let user_info = ComputeUserInfoMaybeEndpoint {
|
||||
user: "conrad".into(),
|
||||
endpoint_id: Some("endpoint".into()),
|
||||
options: NeonOptions::default(),
|
||||
};
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut scram = ScramSha256::new(b"my-secret-password", ChannelBinding::unsupported());
|
||||
|
||||
let mut read = BytesMut::new();
|
||||
|
||||
// server should offer scram
|
||||
match read_message(&mut client, &mut read).await {
|
||||
PgMessage::AuthenticationSasl(a) => {
|
||||
let options: Vec<&str> = a.mechanisms().collect().unwrap();
|
||||
assert_eq!(options, ["SCRAM-SHA-256"]);
|
||||
}
|
||||
_ => panic!("wrong message"),
|
||||
}
|
||||
|
||||
// client sends client-first-message
|
||||
let mut write = BytesMut::new();
|
||||
frontend::sasl_initial_response("SCRAM-SHA-256", scram.message(), &mut write).unwrap();
|
||||
client.write_all(&write).await.unwrap();
|
||||
|
||||
// server response with server-first-message
|
||||
match read_message(&mut client, &mut read).await {
|
||||
PgMessage::AuthenticationSaslContinue(a) => {
|
||||
scram.update(a.data()).await.unwrap();
|
||||
}
|
||||
_ => panic!("wrong message"),
|
||||
}
|
||||
|
||||
// client response with client-final-message
|
||||
write.clear();
|
||||
frontend::sasl_response(scram.message(), &mut write).unwrap();
|
||||
client.write_all(&write).await.unwrap();
|
||||
|
||||
// server response with server-final-message
|
||||
match read_message(&mut client, &mut read).await {
|
||||
PgMessage::AuthenticationSaslFinal(a) => {
|
||||
scram.finish(a.data()).unwrap();
|
||||
}
|
||||
_ => panic!("wrong message"),
|
||||
}
|
||||
});
|
||||
|
||||
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, false, CONFIG)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_quirks_cleartext() {
|
||||
let (mut client, server) = tokio::io::duplex(1024);
|
||||
let mut stream = PqStream::new(Stream::from_raw(server));
|
||||
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let api = Auth {
|
||||
ips: vec![],
|
||||
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
|
||||
};
|
||||
|
||||
let user_info = ComputeUserInfoMaybeEndpoint {
|
||||
user: "conrad".into(),
|
||||
endpoint_id: Some("endpoint".into()),
|
||||
options: NeonOptions::default(),
|
||||
};
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut read = BytesMut::new();
|
||||
let mut write = BytesMut::new();
|
||||
|
||||
// server should offer cleartext
|
||||
match read_message(&mut client, &mut read).await {
|
||||
PgMessage::AuthenticationCleartextPassword => {}
|
||||
_ => panic!("wrong message"),
|
||||
}
|
||||
|
||||
// client responds with password
|
||||
write.clear();
|
||||
frontend::password_message(b"my-secret-password", &mut write).unwrap();
|
||||
client.write_all(&write).await.unwrap();
|
||||
});
|
||||
|
||||
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, CONFIG)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_quirks_password_hack() {
|
||||
let (mut client, server) = tokio::io::duplex(1024);
|
||||
let mut stream = PqStream::new(Stream::from_raw(server));
|
||||
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let api = Auth {
|
||||
ips: vec![],
|
||||
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
|
||||
};
|
||||
|
||||
let user_info = ComputeUserInfoMaybeEndpoint {
|
||||
user: "conrad".into(),
|
||||
endpoint_id: None,
|
||||
options: NeonOptions::default(),
|
||||
};
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut read = BytesMut::new();
|
||||
|
||||
// server should offer cleartext
|
||||
match read_message(&mut client, &mut read).await {
|
||||
PgMessage::AuthenticationCleartextPassword => {}
|
||||
_ => panic!("wrong message"),
|
||||
}
|
||||
|
||||
// client responds with password
|
||||
let mut write = BytesMut::new();
|
||||
frontend::password_message(b"endpoint=my-endpoint;my-secret-password", &mut write)
|
||||
.unwrap();
|
||||
client.write_all(&write).await.unwrap();
|
||||
});
|
||||
|
||||
let creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, CONFIG)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(creds.info.endpoint, "my-endpoint");
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,4 +211,19 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancel_session_noop_regression() {
|
||||
let handler = CancellationHandler::<()>::new(Default::default(), "local");
|
||||
handler
|
||||
.cancel_session(
|
||||
CancelKeyData {
|
||||
backend_pid: 0,
|
||||
cancel_key: 0,
|
||||
},
|
||||
Uuid::new_v4(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,14 +82,13 @@ pub type ScramKeys = tokio_postgres::config::ScramKeys<32>;
|
||||
/// A config for establishing a connection to compute node.
|
||||
/// Eventually, `tokio_postgres` will be replaced with something better.
|
||||
/// Newtype allows us to implement methods on top of it.
|
||||
#[derive(Clone)]
|
||||
#[repr(transparent)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ConnCfg(Box<tokio_postgres::Config>);
|
||||
|
||||
/// Creation and initialization routines.
|
||||
impl ConnCfg {
|
||||
pub fn new() -> Self {
|
||||
Self(Default::default())
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Reuse password or auth keys from the other config.
|
||||
@@ -165,12 +164,6 @@ impl std::ops::DerefMut for ConnCfg {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConnCfg {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnCfg {
|
||||
/// Establish a raw TCP connection to the compute node.
|
||||
async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> {
|
||||
|
||||
@@ -6,7 +6,7 @@ pub mod messages;
|
||||
|
||||
/// Wrappers for console APIs and their mocks.
|
||||
pub mod provider;
|
||||
pub use provider::{errors, Api, AuthSecret, CachedNodeInfo, NodeInfo};
|
||||
pub(crate) use provider::{errors, Api, AuthSecret, CachedNodeInfo, NodeInfo};
|
||||
|
||||
/// Various cache-related types.
|
||||
pub mod caches {
|
||||
|
||||
@@ -14,7 +14,6 @@ use crate::{
|
||||
context::RequestMonitoring,
|
||||
scram, EndpointCacheKey, ProjectId,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
@@ -326,8 +325,7 @@ pub type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPatt
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
/// already require a few allocations, so it should be fine.
|
||||
#[async_trait]
|
||||
pub trait Api {
|
||||
pub(crate) trait Api {
|
||||
/// Get the client's auth secret for authentication.
|
||||
/// Returns option because user not found situation is special.
|
||||
/// We still have to mock the scram to avoid leaking information that user doesn't exist.
|
||||
@@ -363,7 +361,6 @@ pub enum ConsoleBackend {
|
||||
Test(Box<dyn crate::auth::backend::TestBackend>),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Api for ConsoleBackend {
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::console::provider::{CachedAllowedIps, CachedRoleSecret};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
|
||||
use crate::{auth::IpPattern, cache::Cached};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
use thiserror::Error;
|
||||
@@ -144,7 +143,6 @@ async fn get_execute_postgres_query(
|
||||
Ok(Some(entry))
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl super::Api for Api {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_role_secret(
|
||||
|
||||
@@ -14,7 +14,6 @@ use crate::{
|
||||
context::RequestMonitoring,
|
||||
metrics::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Instant;
|
||||
@@ -56,7 +55,7 @@ impl Api {
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<AuthInfo, GetAuthInfoError> {
|
||||
let request_id = uuid::Uuid::new_v4().to_string();
|
||||
let request_id = ctx.session_id.to_string();
|
||||
let application_name = ctx.console_application_name();
|
||||
async {
|
||||
let request = self
|
||||
@@ -113,7 +112,7 @@ impl Api {
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<NodeInfo, WakeComputeError> {
|
||||
let request_id = uuid::Uuid::new_v4().to_string();
|
||||
let request_id = ctx.session_id.to_string();
|
||||
let application_name = ctx.console_application_name();
|
||||
async {
|
||||
let mut request_builder = self
|
||||
@@ -168,7 +167,6 @@ impl Api {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl super::Api for Api {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_role_secret(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::AsyncCommands;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -13,8 +12,8 @@ use super::{
|
||||
notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME},
|
||||
};
|
||||
|
||||
#[async_trait]
|
||||
pub trait CancellationPublisherMut: Send + Sync + 'static {
|
||||
#[allow(async_fn_in_trait)]
|
||||
async fn try_publish(
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
@@ -22,8 +21,8 @@ pub trait CancellationPublisherMut: Send + Sync + 'static {
|
||||
) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait CancellationPublisher: Send + Sync + 'static {
|
||||
#[allow(async_fn_in_trait)]
|
||||
async fn try_publish(
|
||||
&self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
@@ -31,10 +30,9 @@ pub trait CancellationPublisher: Send + Sync + 'static {
|
||||
) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CancellationPublisherMut for () {
|
||||
impl CancellationPublisher for () {
|
||||
async fn try_publish(
|
||||
&mut self,
|
||||
&self,
|
||||
_cancel_key_data: CancelKeyData,
|
||||
_session_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -42,18 +40,16 @@ impl CancellationPublisherMut for () {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<P: CancellationPublisherMut> CancellationPublisher for P {
|
||||
impl<P: CancellationPublisher> CancellationPublisherMut for P {
|
||||
async fn try_publish(
|
||||
&self,
|
||||
_cancel_key_data: CancelKeyData,
|
||||
_session_id: Uuid,
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
self.try_publish(_cancel_key_data, _session_id).await
|
||||
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
|
||||
async fn try_publish(
|
||||
&self,
|
||||
@@ -68,7 +64,6 @@ impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
|
||||
async fn try_publish(
|
||||
&self,
|
||||
@@ -145,7 +140,6 @@ impl RedisPublisherClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CancellationPublisherMut for RedisPublisherClient {
|
||||
async fn try_publish(
|
||||
&mut self,
|
||||
|
||||
@@ -3,9 +3,7 @@
|
||||
use std::convert::Infallible;
|
||||
|
||||
use hmac::{Hmac, Mac};
|
||||
use sha2::digest::FixedOutput;
|
||||
use sha2::{Digest, Sha256};
|
||||
use subtle::{Choice, ConstantTimeEq};
|
||||
use sha2::Sha256;
|
||||
use tokio::task::yield_now;
|
||||
|
||||
use super::messages::{
|
||||
@@ -13,6 +11,7 @@ use super::messages::{
|
||||
};
|
||||
use super::secret::ServerSecret;
|
||||
use super::signature::SignatureBuilder;
|
||||
use super::ScramKey;
|
||||
use crate::config;
|
||||
use crate::sasl::{self, ChannelBinding, Error as SaslError};
|
||||
|
||||
@@ -104,7 +103,7 @@ async fn pbkdf2(str: &[u8], salt: &[u8], iterations: u32) -> [u8; 32] {
|
||||
}
|
||||
|
||||
// copied from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L236-L248>
|
||||
async fn derive_keys(password: &[u8], salt: &[u8], iterations: u32) -> ([u8; 32], [u8; 32]) {
|
||||
async fn derive_client_key(password: &[u8], salt: &[u8], iterations: u32) -> ScramKey {
|
||||
let salted_password = pbkdf2(password, salt, iterations).await;
|
||||
|
||||
let make_key = |name| {
|
||||
@@ -116,7 +115,7 @@ async fn derive_keys(password: &[u8], salt: &[u8], iterations: u32) -> ([u8; 32]
|
||||
<[u8; 32]>::from(key.into_bytes())
|
||||
};
|
||||
|
||||
(make_key(b"Client Key"), make_key(b"Server Key"))
|
||||
make_key(b"Client Key").into()
|
||||
}
|
||||
|
||||
pub async fn exchange(
|
||||
@@ -124,21 +123,12 @@ pub async fn exchange(
|
||||
password: &[u8],
|
||||
) -> sasl::Result<sasl::Outcome<super::ScramKey>> {
|
||||
let salt = base64::decode(&secret.salt_base64)?;
|
||||
let (client_key, server_key) = derive_keys(password, &salt, secret.iterations).await;
|
||||
let stored_key: [u8; 32] = Sha256::default()
|
||||
.chain_update(client_key)
|
||||
.finalize_fixed()
|
||||
.into();
|
||||
let client_key = derive_client_key(password, &salt, secret.iterations).await;
|
||||
|
||||
// constant time to not leak partial key match
|
||||
let valid = stored_key.ct_eq(&secret.stored_key.as_bytes())
|
||||
| server_key.ct_eq(&secret.server_key.as_bytes())
|
||||
| Choice::from(secret.doomed as u8);
|
||||
|
||||
if valid.into() {
|
||||
Ok(sasl::Outcome::Success(super::ScramKey::from(client_key)))
|
||||
} else {
|
||||
if secret.is_password_invalid(&client_key).into() {
|
||||
Ok(sasl::Outcome::Failure("password doesn't match"))
|
||||
} else {
|
||||
Ok(sasl::Outcome::Success(client_key))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,7 +210,7 @@ impl SaslSentInner {
|
||||
.derive_client_key(&client_final_message.proof);
|
||||
|
||||
// Auth fails either if keys don't match or it's pre-determined to fail.
|
||||
if client_key.sha256() != secret.stored_key || secret.doomed {
|
||||
if secret.is_password_invalid(&client_key).into() {
|
||||
return Ok(sasl::Step::Failure("password doesn't match"));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,31 @@
|
||||
//! Tools for client/server/stored key management.
|
||||
|
||||
use subtle::ConstantTimeEq;
|
||||
|
||||
/// Faithfully taken from PostgreSQL.
|
||||
pub const SCRAM_KEY_LEN: usize = 32;
|
||||
|
||||
/// One of the keys derived from the user's password.
|
||||
/// We use the same structure for all keys, i.e.
|
||||
/// `ClientKey`, `StoredKey`, and `ServerKey`.
|
||||
#[derive(Clone, Default, PartialEq, Eq, Debug)]
|
||||
#[derive(Clone, Default, Eq, Debug)]
|
||||
#[repr(transparent)]
|
||||
pub struct ScramKey {
|
||||
bytes: [u8; SCRAM_KEY_LEN],
|
||||
}
|
||||
|
||||
impl PartialEq for ScramKey {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.ct_eq(other).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl ConstantTimeEq for ScramKey {
|
||||
fn ct_eq(&self, other: &Self) -> subtle::Choice {
|
||||
self.bytes.ct_eq(&other.bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl ScramKey {
|
||||
pub fn sha256(&self) -> Self {
|
||||
super::sha256([self.as_ref()]).into()
|
||||
|
||||
@@ -206,6 +206,28 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_invalid_gs2_authz() {
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params() {
|
||||
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.username, "user");
|
||||
assert_eq!(msg.nonce, "nonce");
|
||||
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params_invalid() {
|
||||
// must be of the form `<ascii letter>=<...>`
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_final_message() {
|
||||
let input = [
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
//! Tools for SCRAM server secret management.
|
||||
|
||||
use subtle::{Choice, ConstantTimeEq};
|
||||
|
||||
use super::base64_decode_array;
|
||||
use super::key::ScramKey;
|
||||
|
||||
@@ -40,6 +42,11 @@ impl ServerSecret {
|
||||
Some(secret)
|
||||
}
|
||||
|
||||
pub fn is_password_invalid(&self, client_key: &ScramKey) -> Choice {
|
||||
// constant time to not leak partial key match
|
||||
client_key.sha256().ct_ne(&self.stored_key) | Choice::from(self.doomed as u8)
|
||||
}
|
||||
|
||||
/// To avoid revealing information to an attacker, we use a
|
||||
/// mocked server secret even if the user doesn't exist.
|
||||
/// See `auth-scram.c : mock_scram_secret` for details.
|
||||
|
||||
@@ -244,6 +244,7 @@ impl SimulationApi {
|
||||
mutex: 0,
|
||||
mineLastElectedTerm: 0,
|
||||
backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
|
||||
currentClusterSize: pg_atomic_uint64 { value: 0 },
|
||||
shard_ps_feedback: [empty_feedback; 128],
|
||||
num_shards: 0,
|
||||
min_ps_feedback: empty_feedback,
|
||||
|
||||
@@ -1155,13 +1155,17 @@ class NeonEnv:
|
||||
After this method returns, there should be no child processes running.
|
||||
"""
|
||||
self.endpoints.stop_all()
|
||||
|
||||
# Stop storage controller before pageservers: we don't want it to spuriously
|
||||
# detect a pageserver "failure" during test teardown
|
||||
self.storage_controller.stop(immediate=immediate)
|
||||
|
||||
for sk in self.safekeepers:
|
||||
sk.stop(immediate=immediate)
|
||||
for pageserver in self.pageservers:
|
||||
if ps_assert_metric_no_errors:
|
||||
pageserver.assert_no_metric_errors()
|
||||
pageserver.stop(immediate=immediate)
|
||||
self.storage_controller.stop(immediate=immediate)
|
||||
self.broker.stop(immediate=immediate)
|
||||
|
||||
@property
|
||||
|
||||
@@ -96,6 +96,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
|
||||
".*Call to node.*management API.*failed.*ReceiveBody.*",
|
||||
# Many tests will start up with a node offline
|
||||
".*startup_reconcile: Could not scan node.*",
|
||||
# Tests run in dev mode
|
||||
".*Starting in dev mode.*",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -62,9 +62,7 @@ def wait_for_upload(
|
||||
)
|
||||
time.sleep(1)
|
||||
raise Exception(
|
||||
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
|
||||
lsn, current_lsn
|
||||
)
|
||||
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
2
test_runner/regress/select.sql
Normal file
2
test_runner/regress/select.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
select sum(abalance) from pgbench_accounts;
|
||||
|
||||
@@ -105,7 +105,7 @@ def test_pageserver_multiple_keys(neon_env_builder: NeonEnvBuilder):
|
||||
# The neon_local tool generates one key pair at a hardcoded path by default.
|
||||
# As a preparation for our test, move the public key of the key pair into a
|
||||
# directory at the same location as the hardcoded path by:
|
||||
# 1. moving the the file at `configured_pub_key_path` to a temporary location
|
||||
# 1. moving the file at `configured_pub_key_path` to a temporary location
|
||||
# 2. creating a new directory at `configured_pub_key_path`
|
||||
# 3. moving the file from the temporary location into the newly created directory
|
||||
configured_pub_key_path = Path(env.repo_dir) / "auth_public_key.pem"
|
||||
|
||||
@@ -267,9 +267,10 @@ def test_forward_compatibility(
|
||||
|
||||
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
|
||||
ep = env.endpoints.create_start("main")
|
||||
connstr = ep.connstr()
|
||||
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
|
||||
connstr = ep.connstr()
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
@@ -286,6 +287,9 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
|
||||
timeline_id = env.initial_timeline
|
||||
pg_version = env.pg_version
|
||||
|
||||
# Stop endpoint while we recreate timeline
|
||||
ep.stop()
|
||||
|
||||
try:
|
||||
pageserver_http.timeline_preserve_initdb_archive(tenant_id, timeline_id)
|
||||
except PageserverApiException as e:
|
||||
@@ -310,6 +314,9 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
|
||||
existing_initdb_timeline_id=timeline_id,
|
||||
)
|
||||
|
||||
# Timeline exists again: restart the endpoint
|
||||
ep.start()
|
||||
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
|
||||
)
|
||||
|
||||
@@ -84,3 +84,21 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
# clean up
|
||||
if slow_down_send:
|
||||
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
|
||||
|
||||
|
||||
def test_2_replicas_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary1"
|
||||
) as secondary1:
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary, endpoint_id="secondary2"
|
||||
) as secondary2:
|
||||
wait_replica_caughtup(primary, secondary1)
|
||||
wait_replica_caughtup(primary, secondary2)
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import gzip
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
@@ -10,7 +12,11 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
RemoteStorageKind,
|
||||
remote_storage_to_toml_inline_table,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
@@ -40,6 +46,9 @@ def test_metric_collection(
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
assert neon_env_builder.pageserver_remote_storage is not None
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
@@ -48,12 +57,11 @@ def test_metric_collection(
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
metric_collection_bucket={remote_storage_to_toml_inline_table(neon_env_builder.pageserver_remote_storage)}
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
@@ -167,6 +175,20 @@ def test_metric_collection(
|
||||
|
||||
httpserver.check()
|
||||
|
||||
# Check that at least one bucket output object is present, and that all
|
||||
# can be decompressed and decoded.
|
||||
bucket_dumps = {}
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
for dirpath, _dirs, files in os.walk(env.pageserver_remote_storage.root):
|
||||
for file in files:
|
||||
file_path = os.path.join(dirpath, file)
|
||||
log.info(file_path)
|
||||
if file.endswith(".gz"):
|
||||
bucket_dumps[file_path] = json.load(gzip.open(file_path))
|
||||
|
||||
assert len(bucket_dumps) >= 1
|
||||
assert all("events" in data for data in bucket_dumps.values())
|
||||
|
||||
|
||||
def test_metric_collection_cleans_up_tempfile(
|
||||
httpserver: HTTPServer,
|
||||
|
||||
@@ -13,14 +13,17 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_pageserver_restarts")
|
||||
endpoint = env.endpoints.create_start("test_pageserver_restarts")
|
||||
n_restarts = 10
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_pageserver_restarts", config_lines=["effective_io_concurrency=100"]
|
||||
)
|
||||
|
||||
n_restarts = 100
|
||||
scale = 10
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
|
||||
pg_bin.run_capture(["pgbench", "-c", "10", "-f", "test_runner/regress/select.sql", f"-T{n_restarts}", connstr])
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
thread.start()
|
||||
|
||||
@@ -11,6 +11,7 @@ from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
poll_for_remote_storage_iterations,
|
||||
tenant_delete_wait_completed,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
@@ -472,6 +473,10 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("Synchronizing after initial write...")
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
# Ensure that everything which appears in the heatmap is also present in S3: heatmap writers
|
||||
# are allowed to upload heatmaps that reference layers which are only enqueued for upload
|
||||
wait_for_upload_queue_empty(ps_attached.http_client(), tenant_id, timeline_id)
|
||||
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
|
||||
@@ -484,6 +489,11 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
workload.churn_rows(128, ps_attached.id)
|
||||
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
# Ensure that everything which appears in the heatmap is also present in S3: heatmap writers
|
||||
# are allowed to upload heatmaps that reference layers which are only enqueued for upload
|
||||
wait_for_upload_queue_empty(ps_attached.http_client(), tenant_id, timeline_id)
|
||||
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
@@ -10,7 +9,7 @@ from fixtures.neon_fixtures import (
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
@@ -61,6 +60,15 @@ def wait_until_pageserver_is_caught_up(
|
||||
assert waited >= last_flush_lsn
|
||||
|
||||
|
||||
def wait_until_pageserver_has_uploaded(
|
||||
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
|
||||
):
|
||||
for tenant, timeline, last_flush_lsn in last_flush_lsns:
|
||||
shards = tenant_get_shards(env, tenant)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
wait_for_upload(pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def wait_for_wal_ingest_metric(pageserver_http: PageserverHttpClient) -> float:
|
||||
def query():
|
||||
value = pageserver_http.get_metric_value("pageserver_wal_ingest_records_received_total")
|
||||
@@ -86,25 +94,50 @@ def test_pageserver_small_inmemory_layers(
|
||||
The workload creates a number of timelines and writes some data to each,
|
||||
but not enough to trigger flushes via the `checkpoint_distance` config.
|
||||
"""
|
||||
|
||||
def get_dirty_bytes():
|
||||
v = (
|
||||
env.pageserver.http_client().get_metric_value("pageserver_timeline_ephemeral_bytes")
|
||||
or 0
|
||||
)
|
||||
log.info(f"dirty_bytes: {v}")
|
||||
return v
|
||||
|
||||
def assert_dirty_bytes(v):
|
||||
assert get_dirty_bytes() == v
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
last_flush_lsns = asyncio.run(workload(env, TIMELINE_COUNT, ENTRIES_PER_TIMELINE))
|
||||
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
|
||||
|
||||
# We didn't write enough data to trigger a size-based checkpoint
|
||||
assert get_dirty_bytes() > 0
|
||||
|
||||
ps_http_client = env.pageserver.http_client()
|
||||
total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client)
|
||||
|
||||
log.info("Sleeping for checkpoint timeout ...")
|
||||
time.sleep(CHECKPOINT_TIMEOUT_SECONDS + 5)
|
||||
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
|
||||
# such that there are zero bytes of ephemeral layer left on the pageserver
|
||||
log.info("Waiting for background checkpoints...")
|
||||
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(0)) # type: ignore
|
||||
|
||||
# Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they
|
||||
# must be uploaded to remain visible to the pageserver after restart.
|
||||
wait_until_pageserver_has_uploaded(env, last_flush_lsns)
|
||||
|
||||
env.pageserver.restart(immediate=immediate_shutdown)
|
||||
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
|
||||
|
||||
# Catching up with WAL ingest should have resulted in zero bytes of ephemeral layers, since
|
||||
# we froze, flushed and uploaded everything before restarting. There can be no more WAL writes
|
||||
# because we shut down compute endpoints before flushing.
|
||||
assert get_dirty_bytes() == 0
|
||||
|
||||
total_wal_ingested_after_restart = wait_for_wal_ingest_metric(ps_http_client)
|
||||
|
||||
log.info(f"WAL ingested before restart: {total_wal_ingested_before_restart}")
|
||||
log.info(f"WAL ingested after restart: {total_wal_ingested_after_restart}")
|
||||
|
||||
leeway = total_wal_ingested_before_restart * 5 / 100
|
||||
assert total_wal_ingested_after_restart <= leeway
|
||||
assert total_wal_ingested_after_restart == 0
|
||||
|
||||
@@ -15,6 +15,13 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.is_testing_enabled_or_skip()
|
||||
|
||||
# We expect the pageserver to exit, which will cause storage storage controller
|
||||
# requests to fail and warn.
|
||||
env.storage_controller.allowed_errors.append(".*management API still failed.*")
|
||||
env.storage_controller.allowed_errors.append(
|
||||
".*Reconcile error.*error sending request for url.*"
|
||||
)
|
||||
|
||||
# Create a branch for us
|
||||
env.neon_cli.create_branch("test_pageserver_recovery", "main")
|
||||
|
||||
|
||||
@@ -838,7 +838,7 @@ def test_compaction_waits_for_upload(
|
||||
# upload_stuck_layers and the original initdb L0
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# as uploads are paused, the the upload_stuck_layers should still be with us
|
||||
# as uploads are paused, the upload_stuck_layers should still be with us
|
||||
for name in upload_stuck_layers:
|
||||
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
|
||||
assert path.exists(), "uploads are stuck still over compaction"
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_replication_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
|
||||
@@ -874,11 +874,17 @@ def test_sharding_split_failures(
|
||||
workload.validate()
|
||||
|
||||
if failure.expect_available():
|
||||
# Even though the split failed partway through, this should not have interrupted
|
||||
# clients. Disable waiting for pageservers in the workload helper, because our
|
||||
# failpoints may prevent API access.
|
||||
# This only applies for failure modes that leave pageserver page_service API available.
|
||||
workload.churn_rows(10, upload=False, ingest=False)
|
||||
# Even though the split failed partway through, this should not leave the tenant in
|
||||
# an unavailable state.
|
||||
# - Disable waiting for pageservers in the workload helper, because our
|
||||
# failpoints may prevent API access. This only applies for failure modes that
|
||||
# leave pageserver page_service API available.
|
||||
# - This is a wait_until because clients may see transient errors in some split error cases,
|
||||
# e.g. while waiting for a storage controller to re-attach a parent shard if we failed
|
||||
# inside the pageserver and the storage controller responds by detaching children and attaching
|
||||
# parents concurrently (https://github.com/neondatabase/neon/issues/7148)
|
||||
wait_until(10, 1, lambda: workload.churn_rows(10, upload=False, ingest=False)) # type: ignore
|
||||
|
||||
workload.validate()
|
||||
|
||||
if failure.fails_forward(env):
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 3b09894ddb...748643b468
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 80cef885ad...e7651e79c0
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 9007894722...3946b2e2ea
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "90078947229aa7f9ac5f7ed4527b2c7386d5332b",
|
||||
"postgres-v15": "80cef885add1af6741aa31944c7d2c84d8f9098f",
|
||||
"postgres-v14": "3b09894ddb8825b50c963942059eab1a2a0b0a89"
|
||||
"postgres-v16": "3946b2e2ea71d07af092099cb5bcae76a69b90d6",
|
||||
"postgres-v15": "e7651e79c0c27fbddc3c724f5b9553222c28e395",
|
||||
"postgres-v14": "748643b4683e9fe3b105011a6ba8a687d032cd65"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user