mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-20 20:02:56 +00:00
Compare commits
4 Commits
sk-migrate
...
rustls
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a5c1716edc | ||
|
|
0f36927a17 | ||
|
|
b78a8c4d53 | ||
|
|
dc109c42bc |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -857,7 +857,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.21.0
|
||||
VM_BUILDER_VERSION: v0.19.0
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
155
Cargo.lock
generated
155
Cargo.lock
generated
@@ -572,7 +572,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"rustls",
|
||||
"rustls 0.21.9",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
@@ -2278,10 +2278,10 @@ dependencies = [
|
||||
"http",
|
||||
"hyper",
|
||||
"log",
|
||||
"rustls",
|
||||
"rustls 0.21.9",
|
||||
"rustls-native-certs",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.24.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2493,7 +2493,7 @@ checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"js-sys",
|
||||
"pem 3.0.3",
|
||||
"pem",
|
||||
"ring 0.17.6",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3290,16 +3290,6 @@ version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "2.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6b13fe415cdf3c8e44518e18a7c95a13431d9bdf6d15367d82b23c377fdd441a"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pem"
|
||||
version = "3.0.3"
|
||||
@@ -3482,14 +3472,14 @@ dependencies = [
|
||||
"futures",
|
||||
"once_cell",
|
||||
"pq_proto",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"ring 0.17.6",
|
||||
"rustls 0.22.1",
|
||||
"rustls-pemfile 2.0.0",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tracing",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -3717,8 +3707,8 @@ dependencies = [
|
||||
"routerify",
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"rustls 0.22.1",
|
||||
"rustls-pemfile 2.0.0",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3732,7 +3722,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
@@ -3741,7 +3731,7 @@ dependencies = [
|
||||
"url",
|
||||
"utils",
|
||||
"uuid",
|
||||
"webpki-roots 0.25.2",
|
||||
"webpki-roots",
|
||||
"workspace_hack",
|
||||
"x509-parser",
|
||||
]
|
||||
@@ -3860,12 +3850,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rcgen"
|
||||
version = "0.11.1"
|
||||
version = "0.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4954fbc00dcd4d8282c987710e50ba513d351400dbdd00e803a05172a90d8976"
|
||||
checksum = "5d918c80c5a4c7560db726763020bd16db179e4d5b828078842274a443addb5d"
|
||||
dependencies = [
|
||||
"pem 2.0.1",
|
||||
"ring 0.16.20",
|
||||
"pem",
|
||||
"ring 0.17.6",
|
||||
"time",
|
||||
"yasna",
|
||||
]
|
||||
@@ -4003,14 +3993,14 @@ dependencies = [
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"rustls 0.21.9",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"tokio",
|
||||
"tokio-native-tls",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-util",
|
||||
"tower-service",
|
||||
"url",
|
||||
@@ -4018,7 +4008,7 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams",
|
||||
"web-sys",
|
||||
"webpki-roots 0.25.2",
|
||||
"webpki-roots",
|
||||
"winreg",
|
||||
]
|
||||
|
||||
@@ -4250,6 +4240,20 @@ dependencies = [
|
||||
"sct",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.22.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fe6b63262c9fcac8659abfaa96cac103d28166d3ff3eaf8f412e19f3ae9e5a48"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.6",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.0",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.6.2"
|
||||
@@ -4257,7 +4261,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
@@ -4272,15 +4276,21 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.100.2"
|
||||
name = "rustls-pemfile"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab"
|
||||
checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"
|
||||
dependencies = [
|
||||
"ring 0.16.20",
|
||||
"untrusted 0.7.1",
|
||||
"base64 0.21.1",
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7673e0aa20ee4937c6aacfc12bb8341cfbf054cdd21df6bec5fd0629fe9339b"
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.101.7"
|
||||
@@ -4291,6 +4301,17 @@ dependencies = [
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.102.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "de2635c8bc2b88d367767c5de8ea1d8db9af3f6219eba28442242d9ab81d1b89"
|
||||
dependencies = [
|
||||
"ring 0.17.6",
|
||||
"rustls-pki-types",
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.12"
|
||||
@@ -4331,7 +4352,7 @@ dependencies = [
|
||||
"serde_with",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
"tracing-appender",
|
||||
@@ -4495,7 +4516,7 @@ checksum = "2e95efd0cefa32028cdb9766c96de71d96671072f9fb494dc9fb84c0ef93e52b"
|
||||
dependencies = [
|
||||
"httpdate",
|
||||
"reqwest",
|
||||
"rustls",
|
||||
"rustls 0.21.9",
|
||||
"sentry-backtrace",
|
||||
"sentry-contexts",
|
||||
"sentry-core",
|
||||
@@ -4503,7 +4524,7 @@ dependencies = [
|
||||
"sentry-tracing",
|
||||
"tokio",
|
||||
"ureq",
|
||||
"webpki-roots 0.25.2",
|
||||
"webpki-roots",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5161,16 +5182,14 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||
|
||||
[[package]]
|
||||
name = "tls-listener"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81294c017957a1a69794f506723519255879e15a870507faf45dfed288b763dd"
|
||||
version = "0.9.0"
|
||||
source = "git+https://github.com/conradludgate/tls-listener?branch=main#4801141b5660613e77816044da6540aa64f388ec"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"hyper",
|
||||
"pin-project-lite",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.25.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5253,10 +5272,10 @@ checksum = "dd5831152cb0d3f79ef5523b357319ba154795d64c7078b2daa95a803b54057f"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"ring 0.16.20",
|
||||
"rustls",
|
||||
"rustls 0.21.9",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.24.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5265,7 +5284,18 @@ version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5"
|
||||
dependencies = [
|
||||
"rustls",
|
||||
"rustls 0.21.9",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.25.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
|
||||
dependencies = [
|
||||
"rustls 0.22.1",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -5412,9 +5442,9 @@ dependencies = [
|
||||
"pin-project",
|
||||
"prost",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
@@ -5719,17 +5749,17 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
|
||||
|
||||
[[package]]
|
||||
name = "ureq"
|
||||
version = "2.7.1"
|
||||
version = "2.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b11c96ac7ee530603dcdf68ed1557050f374ce55a5a07193ebf8cbc9f8927e9"
|
||||
checksum = "f8cdd25c339e200129fe4de81451814e5228c9b771d57378817d6117cc2b3f97"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls",
|
||||
"rustls-webpki 0.100.2",
|
||||
"rustls 0.21.9",
|
||||
"rustls-webpki 0.101.7",
|
||||
"url",
|
||||
"webpki-roots 0.23.1",
|
||||
"webpki-roots",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6038,15 +6068,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.23.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
|
||||
dependencies = [
|
||||
"rustls-webpki 0.100.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.25.2"
|
||||
@@ -6295,11 +6316,8 @@ dependencies = [
|
||||
"either",
|
||||
"fail",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-executor",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"hmac",
|
||||
@@ -6318,8 +6336,7 @@ dependencies = [
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
"reqwest",
|
||||
"ring 0.16.20",
|
||||
"rustls",
|
||||
"rustls 0.21.9",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -6330,7 +6347,7 @@ dependencies = [
|
||||
"time",
|
||||
"time-macros",
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-util",
|
||||
"toml_datetime",
|
||||
"toml_edit",
|
||||
|
||||
13
Cargo.toml
13
Cargo.toml
@@ -115,11 +115,12 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"
|
||||
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] }
|
||||
reqwest-middleware = "0.2.0"
|
||||
reqwest-retry = "0.2.2"
|
||||
ring = "0.17"
|
||||
routerify = "3"
|
||||
rpds = "0.13"
|
||||
rustc-hash = "1.1.0"
|
||||
rustls = "0.21"
|
||||
rustls-pemfile = "1"
|
||||
rustls = "0.22.1"
|
||||
rustls-pemfile = "2.0.0"
|
||||
rustls-split = "0.3"
|
||||
scopeguard = "1.1"
|
||||
sysinfo = "0.29.2"
|
||||
@@ -143,11 +144,11 @@ tar = "0.4"
|
||||
task-local-extensions = "0.1.4"
|
||||
test-context = "0.1"
|
||||
thiserror = "1.0"
|
||||
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
|
||||
tls-listener = { version = "0.9.0", features = ["rustls"] }
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.10.0"
|
||||
tokio-rustls = "0.24"
|
||||
tokio-rustls = "0.25.0"
|
||||
tokio-stream = "0.1"
|
||||
tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
@@ -202,7 +203,7 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
|
||||
## Build dependencies
|
||||
criterion = "0.5.1"
|
||||
rcgen = "0.11"
|
||||
rcgen = "0.12"
|
||||
rstest = "0.18"
|
||||
camino-tempfile = "1.0.2"
|
||||
tonic-build = "0.9"
|
||||
@@ -213,6 +214,8 @@ tonic-build = "0.9"
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
tls-listener = { git = "https://github.com/conradludgate/tls-listener", branch="main" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
[profile.release]
|
||||
|
||||
@@ -370,49 +370,33 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn reassign_owned_objects_in_one_db(
|
||||
conf: Config,
|
||||
role_name: &PgIdent,
|
||||
db_owner: &PgIdent,
|
||||
) -> Result<()> {
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.pg_quote(),
|
||||
db_owner.pg_quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name,
|
||||
conf.get_dbname().unwrap_or(""),
|
||||
db_owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Reassign all owned objects in all databases to the owner of the database.
|
||||
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
|
||||
for db in &spec.cluster.databases {
|
||||
if db.owner != *role_name {
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
reassign_owned_objects_in_one_db(conf, role_name, &db.owner)?;
|
||||
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
"REASSIGN OWNED BY {} TO {}",
|
||||
role_name.pg_quote(),
|
||||
db.owner.pg_quote()
|
||||
);
|
||||
info!(
|
||||
"reassigning objects owned by '{}' in db '{}' to '{}'",
|
||||
role_name, &db.name, &db.owner
|
||||
);
|
||||
client.simple_query(&reassign_query)?;
|
||||
|
||||
// This now will only drop privileges of the role
|
||||
let drop_query = format!("DROP OWNED BY {}", role_name.pg_quote());
|
||||
client.simple_query(&drop_query)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Also handle case when there are no databases in the spec.
|
||||
// In this case we need to reassign objects in the default database.
|
||||
let conf = Config::from_str(connstr)?;
|
||||
let db_owner = PgIdent::from_str("cloud_admin")?;
|
||||
reassign_owned_objects_in_one_db(conf, role_name, &db_owner)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -159,7 +159,7 @@ impl From<[u8; 18]> for TenantShardId {
|
||||
/// shard we're dealing with, but do not need to know the full ShardIdentity (because
|
||||
/// we won't be doing any page->shard mapping), and do not need to know the fully qualified
|
||||
/// TenantShardId.
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
|
||||
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub struct ShardIndex {
|
||||
pub shard_number: ShardNumber,
|
||||
pub shard_count: ShardCount,
|
||||
|
||||
@@ -9,10 +9,12 @@ async-trait.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
ring.workspace = true
|
||||
rustls.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -22,5 +24,4 @@ workspace_hack.workspace = true
|
||||
[dev-dependencies]
|
||||
once_cell.workspace = true
|
||||
rustls-pemfile.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-postgres-rustls.workspace = true
|
||||
# tokio-postgres-rustls.workspace = true
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use futures::pin_mut;
|
||||
use futures::{pin_mut, TryFutureExt, FutureExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::ErrorKind;
|
||||
use std::net::SocketAddr;
|
||||
@@ -1030,3 +1030,115 @@ pub enum CopyStreamHandlerEnd {
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MakeRustlsConnect {
|
||||
config: Arc<rustls::ClientConfig>,
|
||||
}
|
||||
|
||||
impl MakeRustlsConnect {
|
||||
pub fn new(config: rustls::ClientConfig) -> Self {
|
||||
Self {
|
||||
config: Arc::new(config),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> tokio_postgres::tls::MakeTlsConnect<S> for MakeRustlsConnect
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Stream = RustlsStream<S>;
|
||||
type TlsConnect = RustlsConnect;
|
||||
type Error = io::Error;
|
||||
|
||||
fn make_tls_connect(&mut self, hostname: &str) -> io::Result<RustlsConnect> {
|
||||
rustls::pki_types::ServerName::try_from(hostname)
|
||||
.map(|dns_name| {
|
||||
RustlsConnect(Some(RustlsConnectData {
|
||||
hostname: dns_name.to_owned(),
|
||||
connector: Arc::clone(&self.config).into(),
|
||||
}))
|
||||
})
|
||||
.or(Ok(RustlsConnect(None)))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RustlsConnect(Option<RustlsConnectData>);
|
||||
|
||||
struct RustlsConnectData {
|
||||
hostname: rustls::pki_types::ServerName<'static>,
|
||||
connector: tokio_rustls::TlsConnector,
|
||||
}
|
||||
|
||||
impl<S> tokio_postgres::tls::TlsConnect<S> for RustlsConnect
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Stream = RustlsStream<S>;
|
||||
type Error = io::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = io::Result<RustlsStream<S>>> + Send>>;
|
||||
|
||||
fn connect(self, stream: S) -> Self::Future {
|
||||
match self.0 {
|
||||
None => Box::pin(core::future::ready(Err(io::ErrorKind::InvalidInput.into()))),
|
||||
Some(c) => c
|
||||
.connector
|
||||
.connect(c.hostname, stream)
|
||||
.map_ok(|s| RustlsStream(Box::pin(s)))
|
||||
.boxed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RustlsStream<S>(Pin<Box<tokio_rustls::client:: TlsStream<S>>>);
|
||||
|
||||
impl<S> tokio_postgres::tls::TlsStream for RustlsStream<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn channel_binding(&self) -> tokio_postgres::tls::ChannelBinding {
|
||||
let (_, session) = self.0.get_ref();
|
||||
match session.peer_certificates() {
|
||||
Some(certs) if !certs.is_empty() => {
|
||||
let sha256 = ring::digest::digest(&ring::digest::SHA256, certs[0].as_ref());
|
||||
tokio_postgres::tls::ChannelBinding::tls_server_end_point(sha256.as_ref().into())
|
||||
}
|
||||
_ => tokio_postgres::tls::ChannelBinding::none(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AsyncRead for RustlsStream<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task:: Context,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<tokio::io::Result<()>> {
|
||||
self.0.as_mut().poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AsyncWrite for RustlsStream<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task:: Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<tokio::io::Result<usize>> {
|
||||
self.0.as_mut().poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task:: Context) -> Poll<tokio::io::Result<()>> {
|
||||
self.0.as_mut().poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut std::task:: Context) -> Poll<tokio::io::Result<()>> {
|
||||
self.0.as_mut().poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
/// Test postgres_backend_async with tokio_postgres
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_backend::MakeRustlsConnect;
|
||||
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
|
||||
use pq_proto::{BeMessage, RowDescriptor};
|
||||
use std::io::Cursor;
|
||||
@@ -9,7 +10,6 @@ use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
use tokio_postgres::{Config, NoTls, SimpleQueryMessage};
|
||||
use tokio_postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
// generate client, server test streams
|
||||
async fn make_tcp_pair() -> (TcpStream, TcpStream) {
|
||||
@@ -72,14 +72,21 @@ async fn simple_select() {
|
||||
}
|
||||
}
|
||||
|
||||
static KEY: Lazy<rustls::PrivateKey> = Lazy::new(|| {
|
||||
static KEY: Lazy<rustls::pki_types::PrivatePkcs1KeyDer<'static>> = Lazy::new(|| {
|
||||
let mut cursor = Cursor::new(include_bytes!("key.pem"));
|
||||
rustls::PrivateKey(rustls_pemfile::rsa_private_keys(&mut cursor).unwrap()[0].clone())
|
||||
|
||||
let key = rustls_pemfile::rsa_private_keys(&mut cursor)
|
||||
.next()
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
key.secret_pkcs1_der().to_owned().into()
|
||||
});
|
||||
|
||||
static CERT: Lazy<rustls::Certificate> = Lazy::new(|| {
|
||||
static CERT: Lazy<rustls::pki_types::CertificateDer<'static>> = Lazy::new(|| {
|
||||
let mut cursor = Cursor::new(include_bytes!("cert.pem"));
|
||||
rustls::Certificate(rustls_pemfile::certs(&mut cursor).unwrap()[0].clone())
|
||||
let cert = rustls_pemfile::certs(&mut cursor).next().unwrap().unwrap();
|
||||
|
||||
cert.into_owned()
|
||||
});
|
||||
|
||||
// test that basic select with ssl works
|
||||
@@ -87,10 +94,10 @@ static CERT: Lazy<rustls::Certificate> = Lazy::new(|| {
|
||||
async fn simple_select_ssl() {
|
||||
let (client_sock, server_sock) = make_tcp_pair().await;
|
||||
|
||||
let key = rustls::pki_types::PrivateKeyDer::Pkcs1(KEY.secret_pkcs1_der().to_owned().into());
|
||||
let server_cfg = rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![CERT.clone()], KEY.clone())
|
||||
.with_single_cert(vec![CERT.clone()], key)
|
||||
.unwrap();
|
||||
let tls_config = Some(Arc::new(server_cfg));
|
||||
let pgbackend =
|
||||
@@ -102,14 +109,13 @@ async fn simple_select_ssl() {
|
||||
});
|
||||
|
||||
let client_cfg = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_root_certificates({
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add(&CERT).unwrap();
|
||||
store.add(CERT.clone()).unwrap();
|
||||
store
|
||||
})
|
||||
.with_no_client_auth();
|
||||
let mut make_tls_connect = tokio_postgres_rustls::MakeRustlsConnect::new(client_cfg);
|
||||
let mut make_tls_connect = MakeRustlsConnect::new(client_cfg);
|
||||
let tls_connect = <MakeRustlsConnect as MakeTlsConnect<TcpStream>>::make_tls_connect(
|
||||
&mut make_tls_connect,
|
||||
"localhost",
|
||||
|
||||
@@ -218,6 +218,14 @@ impl S3Bucket {
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
|
||||
if get_object.is_err() {
|
||||
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
}
|
||||
|
||||
match get_object {
|
||||
Ok(object_output) => {
|
||||
let metadata = object_output.metadata().cloned().map(StorageMetadata);
|
||||
@@ -233,27 +241,11 @@ impl S3Bucket {
|
||||
})
|
||||
}
|
||||
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
|
||||
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
|
||||
// an error: we expect to sometimes fetch an object and find it missing,
|
||||
// e.g. when probing for timeline indices.
|
||||
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Ok,
|
||||
started_at,
|
||||
);
|
||||
Err(DownloadError::NotFound)
|
||||
}
|
||||
Err(e) => {
|
||||
metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
|
||||
kind,
|
||||
AttemptOutcome::Err,
|
||||
started_at,
|
||||
);
|
||||
|
||||
Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("download s3 object"),
|
||||
))
|
||||
}
|
||||
Err(e) => Err(DownloadError::Other(
|
||||
anyhow::Error::new(e).context("download s3 object"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -522,18 +522,14 @@ pub(crate) mod initial_logical_size {
|
||||
impl StartCalculation {
|
||||
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0
|
||||
.with_label_values(&["first", circumstances_label])
|
||||
.inc();
|
||||
self.0.with_label_values(&["first", circumstances_label]);
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
|
||||
}
|
||||
}
|
||||
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
|
||||
let circumstances_label: &'static str = circumstances.into();
|
||||
self.0
|
||||
.with_label_values(&["retry", circumstances_label])
|
||||
.inc();
|
||||
self.0.with_label_values(&["retry", circumstances_label]);
|
||||
OngoingCalculationGuard {
|
||||
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
|
||||
}
|
||||
|
||||
@@ -514,7 +514,10 @@ pub async fn init_tenant_mgr(
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
|
||||
tenants.insert(
|
||||
TenantShardId::unsharded(tenant.tenant_id()),
|
||||
TenantSlot::Attached(tenant),
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
|
||||
@@ -959,27 +962,35 @@ impl TenantManager {
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory structure is the same for attached and secondary modes:
|
||||
// create it if it doesn't exist. Timeline load/creation expects the
|
||||
// timelines/ subdir to already exist.
|
||||
//
|
||||
// Does not need to be fsync'd because local storage is just a cache.
|
||||
tokio::fs::create_dir_all(&timelines_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
// Before activating either secondary or attached mode, persist the
|
||||
// configuration, so that on restart we will re-attach (or re-start
|
||||
// secondary) on the tenant.
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => TenantSlot::Secondary,
|
||||
LocationMode::Secondary(_) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {tenant_path}"))?;
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
TenantSlot::Secondary
|
||||
}
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
// it to exist after crashes: it may be recreated when tenant is
|
||||
// re-attached, see https://github.com/neondatabase/neon/issues/5550
|
||||
tokio::fs::create_dir_all(&tenant_path)
|
||||
.await
|
||||
.with_context(|| format!("Creating {timelines_path}"))?;
|
||||
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let shard_identity = new_location_config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
|
||||
@@ -878,23 +878,6 @@ impl LayerInner {
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let consecutive_failures =
|
||||
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {},
|
||||
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
|
||||
_ = timeline.cancel.cancelled() => {},
|
||||
};
|
||||
|
||||
Err(e)
|
||||
}
|
||||
};
|
||||
@@ -943,9 +926,21 @@ impl LayerInner {
|
||||
Ok(permit)
|
||||
}
|
||||
Ok((Err(e), _permit)) => {
|
||||
// sleep already happened in the spawned task, if it was not cancelled
|
||||
let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
|
||||
// FIXME: this should be with the spawned task and be cancellation sensitive
|
||||
//
|
||||
// while we should not need this, this backoff has turned out to be useful with
|
||||
// a bug of unexpectedly deleted remote layer file (#5787).
|
||||
let consecutive_failures =
|
||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
tokio::time::sleep(backoff).await;
|
||||
Err(DownloadError::DownloadFailed)
|
||||
}
|
||||
Err(_gone) => Err(DownloadError::DownloadCancelled),
|
||||
|
||||
@@ -1712,9 +1712,9 @@ walprop_pg_after_election(WalProposer *wp)
|
||||
f = fopen("restart.lsn", "rb");
|
||||
if (f != NULL && !wp->config->syncSafekeepers)
|
||||
{
|
||||
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
|
||||
fclose(f);
|
||||
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
|
||||
if (lrRestartLsn != InvalidXLogRecPtr)
|
||||
{
|
||||
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
|
||||
use futures::future::Either;
|
||||
use itertools::Itertools;
|
||||
use proxy::config::TlsServerEndPoint;
|
||||
use proxy::proxy::run_until_cancelled;
|
||||
use tokio::net::TcpListener;
|
||||
@@ -76,10 +75,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
let key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
keys.pop().map(rustls::PrivateKey).unwrap()
|
||||
let bytes = keys.pop().unwrap().secret_pkcs8_der().to_owned();
|
||||
rustls::pki_types::PrivateKeyDer::Pkcs1(bytes.into())
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
@@ -87,25 +88,23 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.collect::<Result<Vec<_>,_>>()
|
||||
.context(format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
))?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect_vec()
|
||||
};
|
||||
|
||||
// needed for channel bindings
|
||||
let first_cert = cert_chain.first().context("missing certificate")?;
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
|
||||
let tls_config = rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
let tls_config = rustls::ServerConfig::builder_with_protocol_versions(&[
|
||||
&rustls::version::TLS13,
|
||||
&rustls::version::TLS12,
|
||||
])
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into();
|
||||
|
||||
(tls_config, tls_server_end_point)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use crate::{auth, rate_limiter::RateBucketInfo};
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use rustls::{sign, Certificate, PrivateKey};
|
||||
use rustls::{
|
||||
crypto::ring::sign,
|
||||
pki_types::{CertificateDer, PrivateKeyDer},
|
||||
};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -85,14 +88,14 @@ pub fn configure_tls(
|
||||
|
||||
let cert_resolver = Arc::new(cert_resolver);
|
||||
|
||||
let config = rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
// allow TLS 1.2 to be compatible with older client libraries
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver.clone())
|
||||
.into();
|
||||
// allow TLS 1.2 to be compatible with older client libraries
|
||||
let config = rustls::ServerConfig::builder_with_protocol_versions(&[
|
||||
&rustls::version::TLS13,
|
||||
&rustls::version::TLS12,
|
||||
])
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(cert_resolver.clone())
|
||||
.into();
|
||||
|
||||
Ok(TlsConfig {
|
||||
config,
|
||||
@@ -130,14 +133,14 @@ pub enum TlsServerEndPoint {
|
||||
}
|
||||
|
||||
impl TlsServerEndPoint {
|
||||
pub fn new(cert: &Certificate) -> anyhow::Result<Self> {
|
||||
pub fn new(cert: &CertificateDer) -> anyhow::Result<Self> {
|
||||
let sha256_oids = [
|
||||
// I'm explicitly not adding MD5 or SHA1 here... They're bad.
|
||||
oid_registry::OID_SIG_ECDSA_WITH_SHA256,
|
||||
oid_registry::OID_PKCS1_SHA256WITHRSA,
|
||||
];
|
||||
|
||||
let pem = x509_parser::parse_x509_certificate(&cert.0)
|
||||
let pem = x509_parser::parse_x509_certificate(cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
@@ -147,8 +150,7 @@ impl TlsServerEndPoint {
|
||||
let oid = pem.signature_algorithm.oid();
|
||||
let alg = reg.get(oid);
|
||||
if sha256_oids.contains(oid) {
|
||||
let tls_server_end_point: [u8; 32] =
|
||||
Sha256::new().chain_update(&cert.0).finalize().into();
|
||||
let tls_server_end_point: [u8; 32] = Sha256::new().chain_update(cert).finalize().into();
|
||||
info!(subject = %pem.subject, signature_algorithm = alg.map(|a| a.description()), tls_server_end_point = %base64::encode(tls_server_end_point), "determined channel binding");
|
||||
Ok(Self::Sha256(tls_server_end_point))
|
||||
} else {
|
||||
@@ -162,7 +164,7 @@ impl TlsServerEndPoint {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct CertResolver {
|
||||
certs: HashMap<String, (Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
default: Option<(Arc<rustls::sign::CertifiedKey>, TlsServerEndPoint)>,
|
||||
@@ -182,11 +184,12 @@ impl CertResolver {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
.context(format!("Failed to parse TLS keys at '{key_path}'"))?;
|
||||
let keys: Result<Vec<_>, _> =
|
||||
rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect();
|
||||
let mut keys = keys.context(format!("Failed to parse TLS keys at '{key_path}'"))?;
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
keys.pop().map(rustls::PrivateKey).unwrap()
|
||||
keys.pop().unwrap()
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
@@ -194,30 +197,28 @@ impl CertResolver {
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
)
|
||||
})?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect()
|
||||
};
|
||||
|
||||
self.add_cert(priv_key, cert_chain, is_default)
|
||||
self.add_cert(PrivateKeyDer::Pkcs8(priv_key), cert_chain, is_default)
|
||||
}
|
||||
|
||||
pub fn add_cert(
|
||||
&mut self,
|
||||
priv_key: PrivateKey,
|
||||
cert_chain: Vec<Certificate>,
|
||||
priv_key: PrivateKeyDer,
|
||||
cert_chain: Vec<CertificateDer<'static>>,
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
|
||||
|
||||
let first_cert = &cert_chain[0];
|
||||
let tls_server_end_point = TlsServerEndPoint::new(first_cert)?;
|
||||
let pem = x509_parser::parse_x509_certificate(&first_cert.0)
|
||||
let pem = x509_parser::parse_x509_certificate(first_cert)
|
||||
.context("Failed to parse PEM object from cerficiate")?
|
||||
.1;
|
||||
|
||||
|
||||
@@ -328,19 +328,23 @@ impl<T: AsyncRead> AsyncRead for WithClientIp<T> {
|
||||
|
||||
impl AsyncAccept for ProxyProtocolAccept {
|
||||
type Connection = WithClientIp<AddrStream>;
|
||||
|
||||
type Address = std::net::SocketAddr;
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_accept(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Self::Connection, Self::Error>>> {
|
||||
) -> Poll<Result<(Self::Connection, Self::Address), Self::Error>> {
|
||||
use hyper::server::accept::Accept;
|
||||
let conn = ready!(Pin::new(&mut self.incoming).poll_accept(cx)?);
|
||||
let Some(conn) = conn else {
|
||||
return Poll::Ready(None);
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::NotConnected,
|
||||
"no incoming connection?",
|
||||
)));
|
||||
};
|
||||
|
||||
Poll::Ready(Some(Ok(WithClientIp::new(conn))))
|
||||
let addr = conn.remote_addr();
|
||||
Poll::Ready(Ok((WithClientIp::new(conn), addr)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,16 +11,21 @@ use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use postgres_backend::{MakeRustlsConnect, RustlsStream};
|
||||
use rstest::rstest;
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs1KeyDer};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
|
||||
use tokio_postgres_rustls::{MakeRustlsConnect, RustlsStream};
|
||||
|
||||
/// Generate a set of TLS certificates: CA + server.
|
||||
fn generate_certs(
|
||||
hostname: &str,
|
||||
common_name: &str,
|
||||
) -> anyhow::Result<(rustls::Certificate, rustls::Certificate, rustls::PrivateKey)> {
|
||||
) -> anyhow::Result<(
|
||||
CertificateDer<'static>,
|
||||
CertificateDer<'static>,
|
||||
PrivateKeyDer<'static>,
|
||||
)> {
|
||||
let ca = rcgen::Certificate::from_params({
|
||||
let mut params = rcgen::CertificateParams::default();
|
||||
params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
|
||||
@@ -37,9 +42,9 @@ fn generate_certs(
|
||||
})?;
|
||||
|
||||
Ok((
|
||||
rustls::Certificate(ca.serialize_der()?),
|
||||
rustls::Certificate(cert.serialize_der_with_signer(&ca)?),
|
||||
rustls::PrivateKey(cert.serialize_private_key_der()),
|
||||
CertificateDer::from(ca.serialize_der()?),
|
||||
CertificateDer::from(cert.serialize_der_with_signer(&ca)?),
|
||||
PrivateKeyDer::Pkcs1(PrivatePkcs1KeyDer::from(cert.serialize_private_key_der())),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -73,10 +78,10 @@ fn generate_tls_config<'a>(
|
||||
let (ca, cert, key) = generate_certs(hostname, common_name)?;
|
||||
|
||||
let tls_config = {
|
||||
let key_clone = rustls::pki_types::PrivateKeyDer::Pkcs1(key.secret_der().to_owned().into());
|
||||
let config = rustls::ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(vec![cert.clone()], key.clone())?
|
||||
.with_single_cert(vec![cert.clone()], key_clone)?
|
||||
.into();
|
||||
|
||||
let mut cert_resolver = CertResolver::new();
|
||||
@@ -93,10 +98,9 @@ fn generate_tls_config<'a>(
|
||||
|
||||
let client_config = {
|
||||
let config = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_root_certificates({
|
||||
let mut store = rustls::RootCertStore::empty();
|
||||
store.add(&ca)?;
|
||||
store.add(ca)?;
|
||||
store
|
||||
})
|
||||
.with_no_client_auth();
|
||||
|
||||
@@ -77,14 +77,19 @@ pub async fn task_main(
|
||||
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
ws_connections.close(); // allows `ws_connections.wait to complete`
|
||||
|
||||
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
|
||||
if let Err(err) = conn {
|
||||
error!("failed to accept TLS connection for websockets: {err:?}");
|
||||
ready(false)
|
||||
} else {
|
||||
ready(true)
|
||||
}
|
||||
});
|
||||
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming)
|
||||
.map(|x| match x {
|
||||
Ok((conn, _)) => Ok(conn),
|
||||
Err(e) => Err(e),
|
||||
})
|
||||
.filter(|conn| {
|
||||
if let Err(err) = conn {
|
||||
error!("failed to accept TLS connection for websockets: {err:?}");
|
||||
ready(false)
|
||||
} else {
|
||||
ready(true)
|
||||
}
|
||||
});
|
||||
|
||||
let make_svc = hyper::service::make_service_fn(
|
||||
|stream: &tokio_rustls::server::TlsStream<WithClientIp<AddrStream>>| {
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use tracing::{error, info, warn};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::cloud_admin_api::BranchData;
|
||||
use crate::metadata_stream::stream_listing;
|
||||
@@ -43,7 +40,7 @@ impl TimelineAnalysis {
|
||||
|
||||
pub(crate) fn branch_cleanup_and_check_errors(
|
||||
id: &TenantShardTimelineId,
|
||||
tenant_objects: &mut TenantObjectListing,
|
||||
s3_root: &RootTarget,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
s3_data: Option<S3TimelineBlobData>,
|
||||
@@ -75,8 +72,8 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
match s3_data.blob_data {
|
||||
BlobDataParseResult::Parsed {
|
||||
index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
index_part_generation,
|
||||
mut s3_layers,
|
||||
} => {
|
||||
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
|
||||
result.errors.push(format!(
|
||||
@@ -114,19 +111,65 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
))
|
||||
}
|
||||
|
||||
if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
|
||||
let layer_map_key = (layer, metadata.generation);
|
||||
if !s3_layers.remove(&layer_map_key) {
|
||||
// FIXME: this will emit false positives if an index was
|
||||
// uploaded concurrently with our scan. To make this check
|
||||
// correct, we need to try sending a HEAD request for the
|
||||
// layer we think is missing.
|
||||
result.errors.push(format!(
|
||||
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
|
||||
layer.file_name(),
|
||||
metadata.generation.get_suffix(),
|
||||
metadata.shard
|
||||
"index_part.json contains a layer {}{} that is not present in remote storage",
|
||||
layer_map_key.0.file_name(),
|
||||
layer_map_key.1.get_suffix()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
let orphan_layers: Vec<(LayerFileName, Generation)> = s3_layers
|
||||
.into_iter()
|
||||
.filter(|(_layer_name, gen)|
|
||||
// A layer is only considered orphaned if it has a generation below
|
||||
// the index. If the generation is >= the index, then the layer may
|
||||
// be an upload from a running pageserver, or even an upload from
|
||||
// a new generation that didn't upload an index yet.
|
||||
//
|
||||
// Even so, a layer that is not referenced by the index could just
|
||||
// be something enqueued for deletion, so while this check is valid
|
||||
// for indicating that a layer is garbage, it is not an indicator
|
||||
// of a problem.
|
||||
gen < &index_part_generation)
|
||||
.collect();
|
||||
|
||||
if !orphan_layers.is_empty() {
|
||||
// An orphan layer is not an error: it's arguably not even a warning, but it is helpful to report
|
||||
// these as a hint that there is something worth cleaning up here.
|
||||
result.warnings.push(format!(
|
||||
"index_part.json does not contain layers from S3: {:?}",
|
||||
orphan_layers
|
||||
.iter()
|
||||
.map(|(layer_name, gen)| format!(
|
||||
"{}{}",
|
||||
layer_name.file_name(),
|
||||
gen.get_suffix()
|
||||
))
|
||||
.collect::<Vec<_>>(),
|
||||
));
|
||||
result.garbage_keys.extend(orphan_layers.iter().map(
|
||||
|(layer_name, layer_gen)| {
|
||||
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
|
||||
let delimiter = s3_root.delimiter();
|
||||
if !key.ends_with(delimiter) {
|
||||
key.push_str(delimiter);
|
||||
}
|
||||
key.push_str(&format!(
|
||||
"{}{}",
|
||||
&layer_name.file_name(),
|
||||
layer_gen.get_suffix()
|
||||
));
|
||||
key
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
BlobDataParseResult::Relic => {}
|
||||
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
|
||||
@@ -161,83 +204,6 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
result
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct LayerRef {
|
||||
ref_count: usize,
|
||||
}
|
||||
|
||||
/// Top-level index of objects in a tenant. This may be used by any shard-timeline within
|
||||
/// the tenant to query whether an object exists.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TenantObjectListing {
|
||||
shard_timelines:
|
||||
HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
|
||||
}
|
||||
|
||||
impl TenantObjectListing {
|
||||
/// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
|
||||
/// list of layer keys for the Tenant.
|
||||
pub(crate) fn push(
|
||||
&mut self,
|
||||
ttid: TenantShardTimelineId,
|
||||
layers: HashSet<(LayerFileName, Generation)>,
|
||||
) {
|
||||
let shard_index = ShardIndex::new(
|
||||
ttid.tenant_shard_id.shard_number,
|
||||
ttid.tenant_shard_id.shard_count,
|
||||
);
|
||||
let replaced = self.shard_timelines.insert(
|
||||
(shard_index, ttid.timeline_id),
|
||||
layers
|
||||
.into_iter()
|
||||
.map(|l| (l, LayerRef::default()))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"Built from an S3 object listing, which should never repeat a key"
|
||||
);
|
||||
}
|
||||
|
||||
/// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
|
||||
/// the layer's refcount will be incremented. Later, after calling this for all references in all indices
|
||||
/// in a tenant, orphan layers may be detected by their zero refcounts.
|
||||
///
|
||||
/// Returns true if the layer exists
|
||||
pub(crate) fn check_ref(
|
||||
&mut self,
|
||||
timeline_id: TimelineId,
|
||||
layer_file: &LayerFileName,
|
||||
metadata: &IndexLayerMetadata,
|
||||
) -> bool {
|
||||
let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
layer_ref.ref_count += 1;
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
|
||||
let mut result = Vec::new();
|
||||
for ((shard_index, timeline_id), layers) in &self.shard_timelines {
|
||||
for ((layer_file, generation), layer_ref) in layers {
|
||||
if layer_ref.ref_count == 0 {
|
||||
result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct S3TimelineBlobData {
|
||||
pub(crate) blob_data: BlobDataParseResult,
|
||||
|
||||
@@ -2,25 +2,22 @@ use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::checks::{
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
|
||||
TenantObjectListing, TimelineAnalysis,
|
||||
TimelineAnalysis,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{pin_mut, StreamExt, TryStreamExt};
|
||||
use histogram::Histogram;
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MetadataSummary {
|
||||
count: usize,
|
||||
with_errors: HashSet<TenantShardTimelineId>,
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_orphans: HashSet<TenantShardTimelineId>,
|
||||
with_garbage: HashSet<TenantShardTimelineId>,
|
||||
indices_by_version: HashMap<usize, usize>,
|
||||
|
||||
layer_count: MinMaxHisto,
|
||||
@@ -90,7 +87,7 @@ impl MetadataSummary {
|
||||
count: 0,
|
||||
with_errors: HashSet::new(),
|
||||
with_warnings: HashSet::new(),
|
||||
with_orphans: HashSet::new(),
|
||||
with_garbage: HashSet::new(),
|
||||
indices_by_version: HashMap::new(),
|
||||
layer_count: MinMaxHisto::new(),
|
||||
timeline_size_bytes: MinMaxHisto::new(),
|
||||
@@ -144,10 +141,6 @@ impl MetadataSummary {
|
||||
}
|
||||
}
|
||||
|
||||
fn notify_timeline_orphan(&mut self, ttid: &TenantShardTimelineId) {
|
||||
self.with_orphans.insert(*ttid);
|
||||
}
|
||||
|
||||
/// Long-form output for printing at end of a scan
|
||||
pub fn summary_string(&self) -> String {
|
||||
let version_summary: String = itertools::join(
|
||||
@@ -161,7 +154,7 @@ impl MetadataSummary {
|
||||
"Timelines: {0}
|
||||
With errors: {1}
|
||||
With warnings: {2}
|
||||
With orphan layers: {3}
|
||||
With garbage: {3}
|
||||
Index versions: {version_summary}
|
||||
Timeline size bytes: {4}
|
||||
Layer size bytes: {5}
|
||||
@@ -170,7 +163,7 @@ Timeline layer count: {6}
|
||||
self.count,
|
||||
self.with_errors.len(),
|
||||
self.with_warnings.len(),
|
||||
self.with_orphans.len(),
|
||||
self.with_garbage.len(),
|
||||
self.timeline_size_bytes.oneline(),
|
||||
self.layer_size_bytes.oneline(),
|
||||
self.layer_count.oneline(),
|
||||
@@ -198,7 +191,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
@@ -211,118 +204,17 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
||||
let timelines = timelines.try_buffered(CONCURRENCY);
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
|
||||
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
|
||||
// shards in the same tenant might refer to one anothers' keys if a shard split has happened.
|
||||
|
||||
let mut tenant_id = None;
|
||||
let mut tenant_objects = TenantObjectListing::default();
|
||||
let mut tenant_timeline_results = Vec::new();
|
||||
|
||||
fn analyze_tenant(
|
||||
tenant_id: TenantId,
|
||||
summary: &mut MetadataSummary,
|
||||
mut tenant_objects: TenantObjectListing,
|
||||
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
|
||||
) {
|
||||
let mut timeline_generations = HashMap::new();
|
||||
for (ttid, data) in timelines {
|
||||
// Stash the generation of each timeline, for later use identifying orphan layers
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation,
|
||||
s3_layers: _s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
timeline_generations.insert(ttid, *index_part_generation);
|
||||
}
|
||||
|
||||
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
|
||||
// reference counts for layers across the tenant.
|
||||
let analysis =
|
||||
branch_cleanup_and_check_errors(&ttid, &mut tenant_objects, None, None, Some(data));
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
// Identifying orphan layers must be done on a tenant-wide basis, because individual
|
||||
// shards' layers may be referenced by other shards.
|
||||
//
|
||||
// Orphan layers are not a corruption, and not an indication of a problem. They are just
|
||||
// consuming some space in remote storage, and may be cleaned up at leisure.
|
||||
for (shard_index, timeline_id, layer_file, generation) in tenant_objects.get_orphans() {
|
||||
let ttid = TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: shard_index.shard_count,
|
||||
shard_number: shard_index.shard_number,
|
||||
},
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
if let Some(timeline_generation) = timeline_generations.get(&ttid) {
|
||||
if &generation >= timeline_generation {
|
||||
// Candidate orphan layer is in the current or future generation relative
|
||||
// to the index we read for this timeline shard, so its absence from the index
|
||||
// doesn't make it an orphan: more likely, it is a case where the layer was
|
||||
// uploaded, but the index referencing the layer wasn't written yet.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let orphan_path = remote_layer_path(
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
shard_index,
|
||||
&layer_file,
|
||||
generation,
|
||||
);
|
||||
|
||||
tracing::info!("Orphan layer detected: {orphan_path}");
|
||||
|
||||
summary.notify_timeline_orphan(&ttid);
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate through all the timeline results. These are in key-order, so
|
||||
// all results for the same tenant will be adjacent. We accumulate these,
|
||||
// and then call `analyze_tenant` to flush, when we see the next tenant ID.
|
||||
let mut summary = MetadataSummary::new();
|
||||
pin_mut!(timelines);
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
|
||||
match tenant_id {
|
||||
None => tenant_id = Some(ttid.tenant_shard_id.tenant_id),
|
||||
Some(prev_tenant_id) => {
|
||||
if prev_tenant_id != ttid.tenant_shard_id.tenant_id {
|
||||
let tenant_objects = std::mem::take(&mut tenant_objects);
|
||||
let timelines = std::mem::take(&mut tenant_timeline_results);
|
||||
analyze_tenant(prev_tenant_id, &mut summary, tenant_objects, timelines);
|
||||
tenant_id = Some(ttid.tenant_shard_id.tenant_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
|
||||
|
||||
if let BlobDataParseResult::Parsed {
|
||||
index_part: _index_part,
|
||||
index_part_generation: _index_part_generation,
|
||||
s3_layers,
|
||||
} = &data.blob_data
|
||||
{
|
||||
tenant_objects.push(ttid, s3_layers.clone());
|
||||
}
|
||||
tenant_timeline_results.push((ttid, data));
|
||||
}
|
||||
|
||||
if !tenant_timeline_results.is_empty() {
|
||||
analyze_tenant(
|
||||
tenant_id.expect("Must be set if results are present"),
|
||||
&mut summary,
|
||||
tenant_objects,
|
||||
tenant_timeline_results,
|
||||
);
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
|
||||
2
scripts/sk_collect_dumps/.gitignore
vendored
2
scripts/sk_collect_dumps/.gitignore
vendored
@@ -1,4 +1,2 @@
|
||||
result
|
||||
*.json
|
||||
hosts
|
||||
poetry.lock
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
[defaults]
|
||||
host_key_checking = False
|
||||
inventory=./hosts
|
||||
remote_tmp=/tmp
|
||||
remote_user=developer
|
||||
callbacks_enabled = profile_tasks
|
||||
|
||||
[ssh_connection]
|
||||
scp_if_ssh = True
|
||||
ssh_args = -F ./ssh.cfg
|
||||
pipelining = True
|
||||
@@ -1,16 +0,0 @@
|
||||
[tool.poetry]
|
||||
name = "sk-collect-dumps"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["Arseny Sher <sher-ars@yandex.ru>"]
|
||||
readme = "README.md"
|
||||
packages = [{include = "sk_collect_dumps"}]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
ansible = "^9.1.0"
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
@@ -1,43 +1,25 @@
|
||||
# Collect /v1/debug_dump from all safekeeper nodes
|
||||
|
||||
3. Issue admin token (add/remove .stage from url for staging/prod and setting proper API key):
|
||||
```
|
||||
# staging:
|
||||
AUTH_TOKEN=$(curl https://console.stage.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_STAGING_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
|
||||
# prod:
|
||||
AUTH_TOKEN=$(curl https://console.neon.tech/regions/console/api/v1/admin/issue_token -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer $NEON_PROD_KEY" -X POST -d '{"ttl_seconds": 43200, "scope": "safekeeperdata"}' 2>/dev/null | jq --raw-output '.jwt')
|
||||
# check
|
||||
echo $AUTH_TOKEN
|
||||
```
|
||||
2. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
|
||||
1. Run ansible playbooks to collect .json dumps from all safekeepers and store them in `./result` directory.
|
||||
2. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
|
||||
|
||||
There are two ways to do that, with ssm or tsh. ssm:
|
||||
```
|
||||
# in aws repo, cd .github/ansible and run e.g. (adjusting profile and region in vars and limit):
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i inventory_aws_ec2.yaml -i staging.us-east-2.vars.yaml -e @ssm_config -l 'safekeeper:&us_east_2' -e "auth_token=${AUTH_TOKEN}" ~/neon/neon/scripts/sk_collect_dumps/remote.yaml
|
||||
```
|
||||
It will put the results to .results directory *near the playbook*.
|
||||
## How to use ansible (staging)
|
||||
|
||||
tsh:
|
||||
|
||||
Update the inventory, if needed, selecting .build/.tech and optionally region:
|
||||
```
|
||||
rm -f hosts && echo '[safekeeper]' >> hosts
|
||||
# staging:
|
||||
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.build" | grep us-east-2 >> hosts
|
||||
# prod:
|
||||
tsh ls | awk '{print $1}' | grep safekeeper | grep "neon.tech" | grep us-east-2 >> hosts
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=dev ansible-playbook -i ../../.github/ansible/staging.eu-west-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
```
|
||||
|
||||
Test ansible connection:
|
||||
## How to use ansible (prod)
|
||||
|
||||
```
|
||||
ansible all -m ping -v
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-west-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.us-east-2.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.eu-central-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
|
||||
AWS_DEFAULT_PROFILE=prod ansible-playbook -i ../../.github/ansible/prod.ap-southeast-1.hosts.yaml -e @../../.github/ansible/ssm_config remote.yaml
|
||||
```
|
||||
|
||||
Download the dumps:
|
||||
```
|
||||
mkdir -p result && rm -f result/*
|
||||
ansible-playbook -e "auth_token=${AUTH_TOKEN}" remote.yaml
|
||||
```
|
||||
|
||||
3. Run `DB_CONNSTR=... ./upload.sh prod_feb30` to upload dumps to `prod_feb30` table in specified postgres database.
|
||||
|
||||
@@ -1,37 +1,18 @@
|
||||
- name: Fetch state dumps from safekeepers
|
||||
hosts: safekeeper
|
||||
hosts: safekeepers
|
||||
gather_facts: False
|
||||
remote_user: "{{ remote_user }}"
|
||||
|
||||
tasks:
|
||||
- name: Dump file
|
||||
- name: Download file
|
||||
get_url:
|
||||
url: "http://{{ inventory_hostname }}:7676/v1/debug_dump?dump_all=true&dump_disk_content=false"
|
||||
dest: "/tmp/{{ inventory_hostname }}-dump.json"
|
||||
headers:
|
||||
Authorization: "Bearer {{ auth_token }}"
|
||||
dest: "/tmp/{{ inventory_hostname }}.json"
|
||||
|
||||
- name: install rsync
|
||||
ansible.builtin.apt:
|
||||
name: rsync
|
||||
update_cache: yes
|
||||
become: yes
|
||||
ignore_errors: true # it can be already installed and we don't always have sudo
|
||||
|
||||
- name: Fetch file from remote hosts (works only with ssm)
|
||||
- name: Fetch file from remote hosts
|
||||
fetch:
|
||||
src: "/tmp/{{ inventory_hostname }}-dump.json"
|
||||
dest: "./result/{{ inventory_hostname }}-dump.json"
|
||||
src: "/tmp/{{ inventory_hostname }}.json"
|
||||
dest: "./result/{{ inventory_hostname }}.json"
|
||||
flat: yes
|
||||
fail_on_missing: no
|
||||
when: ansible_connection == "aws_ssm"
|
||||
|
||||
# xxx not sure how to make ansible 'synchronize' work with tsh
|
||||
- name: Fetch file from remote hosts
|
||||
shell: rsync -e 'tsh ssh' -azvP "developer@{{ inventory_hostname }}:/tmp/{{ inventory_hostname }}-dump.json" "./result/{{ inventory_hostname }}-dump.json"
|
||||
delegate_to: localhost
|
||||
when: ansible_connection != "aws_ssm"
|
||||
|
||||
- name: remove remote dumps
|
||||
ansible.builtin.file:
|
||||
path: "/tmp/{{ inventory_hostname }}-dump.json"
|
||||
state: absent
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
# Begin generated Teleport configuration for teleport.aws.neon.tech by tsh
|
||||
|
||||
# Common flags for all teleport.aws.neon.tech hosts
|
||||
Host *
|
||||
HostKeyAlgorithms rsa-sha2-512-cert-v01@openssh.com,rsa-sha2-256-cert-v01@openssh.com,ssh-rsa-cert-v01@openssh.com
|
||||
|
||||
# Flags for all teleport.aws.neon.tech hosts except the proxy
|
||||
Host * !teleport.aws.neon.tech
|
||||
Port 3022
|
||||
ProxyCommand "/usr/local/bin/tsh" proxy ssh --cluster=teleport.aws.neon.tech --proxy=teleport.aws.neon.tech:443 %r@%h:%p
|
||||
User developer
|
||||
|
||||
# End generated Teleport configuration
|
||||
@@ -31,22 +31,22 @@ SELECT
|
||||
(data->>'tenant_id') AS tenant_id,
|
||||
(data->>'timeline_id') AS timeline_id,
|
||||
(data->'memory'->>'active')::bool AS active,
|
||||
(data->'memory'->>'flush_lsn')::pg_lsn AS flush_lsn,
|
||||
(data->'memory'->'mem_state'->>'backup_lsn')::pg_lsn AS backup_lsn,
|
||||
(data->'memory'->'mem_state'->>'commit_lsn')::pg_lsn AS commit_lsn,
|
||||
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::pg_lsn AS peer_horizon_lsn,
|
||||
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::pg_lsn AS remote_consistent_lsn,
|
||||
(data->'memory'->>'write_lsn')::pg_lsn AS write_lsn,
|
||||
(data->'memory'->>'flush_lsn')::bigint AS flush_lsn,
|
||||
(data->'memory'->'mem_state'->>'backup_lsn')::bigint AS backup_lsn,
|
||||
(data->'memory'->'mem_state'->>'commit_lsn')::bigint AS commit_lsn,
|
||||
(data->'memory'->'mem_state'->>'peer_horizon_lsn')::bigint AS peer_horizon_lsn,
|
||||
(data->'memory'->'mem_state'->>'remote_consistent_lsn')::bigint AS remote_consistent_lsn,
|
||||
(data->'memory'->>'write_lsn')::bigint AS write_lsn,
|
||||
(data->'memory'->>'num_computes')::bigint AS num_computes,
|
||||
(data->'memory'->>'epoch_start_lsn')::pg_lsn AS epoch_start_lsn,
|
||||
(data->'memory'->>'epoch_start_lsn')::bigint AS epoch_start_lsn,
|
||||
(data->'memory'->>'last_removed_segno')::bigint AS last_removed_segno,
|
||||
(data->'memory'->>'is_cancelled')::bool AS is_cancelled,
|
||||
(data->'control_file'->>'backup_lsn')::pg_lsn AS disk_backup_lsn,
|
||||
(data->'control_file'->>'commit_lsn')::pg_lsn AS disk_commit_lsn,
|
||||
(data->'control_file'->>'backup_lsn')::bigint AS disk_backup_lsn,
|
||||
(data->'control_file'->>'commit_lsn')::bigint AS disk_commit_lsn,
|
||||
(data->'control_file'->'acceptor_state'->>'term')::bigint AS disk_term,
|
||||
(data->'control_file'->>'local_start_lsn')::pg_lsn AS local_start_lsn,
|
||||
(data->'control_file'->>'peer_horizon_lsn')::pg_lsn AS disk_peer_horizon_lsn,
|
||||
(data->'control_file'->>'timeline_start_lsn')::pg_lsn AS timeline_start_lsn,
|
||||
(data->'control_file'->>'remote_consistent_lsn')::pg_lsn AS disk_remote_consistent_lsn
|
||||
(data->'control_file'->>'local_start_lsn')::bigint AS local_start_lsn,
|
||||
(data->'control_file'->>'peer_horizon_lsn')::bigint AS disk_peer_horizon_lsn,
|
||||
(data->'control_file'->>'timeline_start_lsn')::bigint AS timeline_start_lsn,
|
||||
(data->'control_file'->>'remote_consistent_lsn')::bigint AS disk_remote_consistent_lsn
|
||||
FROM tmp_json
|
||||
EOF
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# export NEON_API_KEY=
|
||||
|
||||
while IFS= read -r ENDPOINT
|
||||
do
|
||||
echo "$ENDPOINT"
|
||||
# curl -X POST -H "Authorization: Bearer $NEON_PROD_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/console/api/v1/admin/endpoints/$ENDPOINT/restart
|
||||
curl -X POST -H "Authorization: Bearer $NEON_API_KEY" -H "Accept: application/json" -H "Content-Type: application/json" https://console.neon.tech/regions/aws-us-east-2/api/v1/admin/endpoints/$ENDPOINT/restart
|
||||
done < endpoints_cplane.txt
|
||||
@@ -1,137 +0,0 @@
|
||||
import argparse
|
||||
import sys
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
import os
|
||||
import requests
|
||||
|
||||
def migrate_project(conn, from_sk: dict[str, any], to_sk: dict[str, any], project_id: str, dry_run=True):
|
||||
print("###############################################################")
|
||||
|
||||
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT * FROM projects WHERE id = %s", (project_id,))
|
||||
project = cur.fetchone()
|
||||
|
||||
if project is None:
|
||||
print("Project with id {} does not exist".format(project_id))
|
||||
return
|
||||
|
||||
assert project['deleted'] == False, "Project with id {} is deleted".format(project_id)
|
||||
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT safekeeper_id FROM projects_safekeepers WHERE project_id = %s", (project_id, ))
|
||||
sk_ids = list(map(lambda x: x[0], cur.fetchall()))
|
||||
assert from_sk['id'] in sk_ids
|
||||
assert to_sk['id'] not in sk_ids
|
||||
|
||||
with conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) as cur:
|
||||
cur.execute("SELECT * FROM branches WHERE project_id = %s AND deleted = 'f'", (project_id, ))
|
||||
branches = cur.fetchall()
|
||||
|
||||
for branch in branches:
|
||||
if branch['deleted'] != False:
|
||||
continue
|
||||
|
||||
tenant_id = project['tenant_id']
|
||||
timeline_id = branch['timeline_id']
|
||||
print("tenant_id: {}, timeline_id: {}".format(tenant_id, timeline_id))
|
||||
print(f"Migrating from {from_sk['host']} to {to_sk['host']}, project={project_id}, branch={branch['id']}, deleted={branch['deleted']}")
|
||||
|
||||
print(list(sk_ids))
|
||||
|
||||
sk_hosts = list(map(
|
||||
lambda x: f"http://{safekeepers[x]['host']}:{safekeepers[x]['http_port']}",
|
||||
filter(lambda x: x != from_sk['id'], sk_ids)
|
||||
))
|
||||
|
||||
# make HTTP request to /pull_timeline
|
||||
# url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
|
||||
url = f"http://{to_sk['host']}:{to_sk['http_port']}/v1/pull_timeline"
|
||||
body = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": sk_hosts,
|
||||
}
|
||||
print(body)
|
||||
|
||||
print("Making HTTP request to {}".format(url), flush=True)
|
||||
if not dry_run:
|
||||
response = requests.post(url, json=body)
|
||||
# response = requests.get(url)
|
||||
|
||||
if response.status_code != 200 and f"error decoding response body: missing field `tenant_id` at line 1 column 104" in response.text:
|
||||
print(f"WARN: Skipping branch {branch['id']} because it's empty on all safekeepers")
|
||||
continue
|
||||
|
||||
if response.status_code != 200 and f"Timeline {timeline_id} already exists" in response.text:
|
||||
print(f"WARN: Skipping timeline {timeline_id} because it is already exists (was migrated earlier)")
|
||||
continue
|
||||
|
||||
if response.status_code != 200:
|
||||
print("ERROR: {}".format(response.text))
|
||||
return
|
||||
print(response.text)
|
||||
|
||||
print(f"Updating safekeeper {from_sk['id']} -> {to_sk['id']} for project={project_id} in the database")
|
||||
if not dry_run:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("UPDATE projects_safekeepers SET safekeeper_id = %s WHERE project_id = %s AND safekeeper_id = %s RETURNING *", (to_sk['id'], project_id, from_sk['id']))
|
||||
print(cur.fetchone())
|
||||
conn.commit()
|
||||
|
||||
def find_projects(sk_from_id: int):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT p.id FROM projects p, projects_safekeepers ps WHERE ps.project_id = p.id AND NOT p.deleted AND ps.safekeeper_id = %s", (sk_from_id, ))
|
||||
project_ids = list(map(lambda x: x[0], cur.fetchall()))
|
||||
return project_ids
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='migrate sk')
|
||||
parser.add_argument("-d", help="database URL", type=str, required=True)
|
||||
parser.add_argument("--from-sk", help="from sk id as in the cplane db", type=int, required=True)
|
||||
parser.add_argument("--to-sk", help="to sk id as in the cplane db", type=int, required=True)
|
||||
parser.add_argument("--not-dry-run", help="", action='store_true')
|
||||
parser.add_argument("--project-id", help="project to migrate", type=str, default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Connect to postgresql database
|
||||
conn = psycopg2.connect(args.d)
|
||||
|
||||
safekeepers = dict()
|
||||
|
||||
# We need to fetch all objects from "safekeepers" table and store them in "safekeepers" list
|
||||
# Create cursor
|
||||
cur = conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor)
|
||||
# Execute query
|
||||
cur.execute("SELECT * FROM safekeepers")
|
||||
# Fetch all rows
|
||||
rows = cur.fetchall()
|
||||
# Close cursor
|
||||
cur.close()
|
||||
|
||||
# Iterate over rows
|
||||
for row in rows:
|
||||
safekeepers[row['id']] = row
|
||||
|
||||
# Print all safekeepers
|
||||
# print(safekeepers)
|
||||
|
||||
assert args.from_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.from_sk)
|
||||
from_sk_hostname = safekeepers[args.from_sk]['host']
|
||||
assert safekeepers[args.from_sk]['active'] == False, "Safekeeper with id {} should be inactive".format(args.from_sk)
|
||||
|
||||
assert args.to_sk in safekeepers, "Safekeeper with id {} does not exist".format(args.to_sk)
|
||||
to_sk_hostname = safekeepers[args.to_sk]['host']
|
||||
assert safekeepers[args.to_sk]['active'] == True, "Safekeeper with id {} should be active".format(args.to_sk)
|
||||
|
||||
print(f"migrating from id {args.from_sk} {from_sk_hostname} to {args.to_sk} {to_sk_hostname}")
|
||||
|
||||
if args.project_id is not None:
|
||||
project_ids = [args.project_id]
|
||||
else:
|
||||
project_ids = find_projects(args.from_sk)
|
||||
print(project_ids)
|
||||
|
||||
for project_id in project_ids:
|
||||
migrate_project(conn, safekeepers[args.from_sk], safekeepers[args.to_sk], project_id)
|
||||
@@ -3,12 +3,9 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SubscribeByFilterRequest,
|
||||
TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, TypedMessage,
|
||||
};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
|
||||
use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
|
||||
use tokio::time;
|
||||
@@ -94,23 +91,15 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
};
|
||||
|
||||
let ttid = ProtoTenantTimelineId {
|
||||
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0xFF; 16],
|
||||
timeline_id: tli_from_u64(i),
|
||||
});
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
subscription_key: Some(key),
|
||||
};
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
r#type: MessageType::SafekeeperTimelineInfo.into(),
|
||||
}],
|
||||
tenant_timeline_id: Some(FilterTenantTimelineId {
|
||||
enabled: true,
|
||||
tenant_timeline_id: Some(ttid),
|
||||
}),
|
||||
};
|
||||
|
||||
let mut stream: tonic::Streaming<TypedMessage> = client
|
||||
.subscribe_by_filter(request)
|
||||
let mut stream = client
|
||||
.subscribe_safekeeper_info(request)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
|
||||
@@ -10,12 +10,6 @@ service BrokerService {
|
||||
|
||||
// Publish safekeeper updates.
|
||||
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
|
||||
|
||||
// Subscribe to all messages, limited by a filter.
|
||||
rpc SubscribeByFilter(SubscribeByFilterRequest) returns (stream TypedMessage) {};
|
||||
|
||||
// Publish one message.
|
||||
rpc PublishOne(TypedMessage) returns (google.protobuf.Empty) {};
|
||||
}
|
||||
|
||||
message SubscribeSafekeeperInfoRequest {
|
||||
@@ -54,55 +48,3 @@ message TenantTimelineId {
|
||||
bytes tenant_id = 1;
|
||||
bytes timeline_id = 2;
|
||||
}
|
||||
|
||||
message FilterTenantTimelineId {
|
||||
// If true, only messages related to `tenant_timeline_id` will be emitted.
|
||||
// Otherwise, messages for all timelines will be emitted.
|
||||
bool enabled = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
}
|
||||
|
||||
message TypeSubscription {
|
||||
MessageType type = 1;
|
||||
}
|
||||
|
||||
message SubscribeByFilterRequest {
|
||||
// Subscription will emit messages only of the specified types. You need to specify
|
||||
// at least one type to receive any messages.
|
||||
repeated TypeSubscription types = 1;
|
||||
|
||||
// If set and enabled, subscription will emit messages only for the specified tenant/timeline.
|
||||
optional FilterTenantTimelineId tenant_timeline_id = 2;
|
||||
}
|
||||
|
||||
enum MessageType {
|
||||
UNKNOWN = 0;
|
||||
SAFEKEEPER_TIMELINE_INFO = 2;
|
||||
SAFEKEEPER_DISCOVERY_REQUEST = 3;
|
||||
SAFEKEEPER_DISCOVERY_RESPONSE = 4;
|
||||
}
|
||||
|
||||
// A message with a type.
|
||||
message TypedMessage {
|
||||
MessageType type = 1;
|
||||
|
||||
optional SafekeeperTimelineInfo safekeeper_timeline_info = 2;
|
||||
optional SafekeeperDiscoveryRequest safekeeper_discovery_request = 3;
|
||||
optional SafekeeperDiscoveryResponse safekeeper_discovery_response = 4;
|
||||
}
|
||||
|
||||
message SafekeeperDiscoveryRequest {
|
||||
TenantTimelineId tenant_timeline_id = 1;
|
||||
}
|
||||
|
||||
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
|
||||
message SafekeeperDiscoveryResponse {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
// WAL available to download.
|
||||
uint64 commit_lsn = 3;
|
||||
// A connection string to use for WAL downloading.
|
||||
string safekeeper_connstr = 4;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 5;
|
||||
}
|
||||
|
||||
@@ -35,16 +35,10 @@ use tracing::*;
|
||||
use utils::signals::ShutdownSignals;
|
||||
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
use storage_broker::metrics::{
|
||||
BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
|
||||
NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
|
||||
};
|
||||
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
|
||||
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
|
||||
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
|
||||
use storage_broker::proto::{
|
||||
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
|
||||
SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
|
||||
};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
|
||||
use storage_broker::{
|
||||
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
|
||||
};
|
||||
@@ -79,103 +73,8 @@ struct Args {
|
||||
log_format: String,
|
||||
}
|
||||
|
||||
/// Id of publisher for registering in maps
|
||||
type PubId = u64;
|
||||
|
||||
/// Id of subscriber for registering in maps
|
||||
type SubId = u64;
|
||||
|
||||
/// Single enum type for all messages.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum Message {
|
||||
SafekeeperTimelineInfo(SafekeeperTimelineInfo),
|
||||
SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
|
||||
SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
|
||||
}
|
||||
|
||||
impl Message {
|
||||
/// Convert proto message to internal message.
|
||||
pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
|
||||
match proto_msg.r#type() {
|
||||
MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
|
||||
proto_msg.safekeeper_timeline_info.ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
|
||||
})?,
|
||||
)),
|
||||
MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
|
||||
proto_msg.safekeeper_discovery_request.ok_or_else(|| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
"missing safekeeper_discovery_request",
|
||||
)
|
||||
})?,
|
||||
)),
|
||||
MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
|
||||
proto_msg.safekeeper_discovery_response.ok_or_else(|| {
|
||||
Status::new(
|
||||
Code::InvalidArgument,
|
||||
"missing safekeeper_discovery_response",
|
||||
)
|
||||
})?,
|
||||
)),
|
||||
MessageType::Unknown => Err(Status::new(
|
||||
Code::InvalidArgument,
|
||||
format!("invalid message type: {:?}", proto_msg.r#type),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the tenant_timeline_id from the message.
|
||||
pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.map(parse_proto_ttid)
|
||||
.transpose()?),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert internal message to the protobuf struct.
|
||||
pub fn as_typed_message(&self) -> TypedMessage {
|
||||
let mut res = TypedMessage {
|
||||
r#type: self.message_type() as i32,
|
||||
..Default::default()
|
||||
};
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(msg) => {
|
||||
res.safekeeper_timeline_info = Some(msg.clone())
|
||||
}
|
||||
Message::SafekeeperDiscoveryRequest(msg) => {
|
||||
res.safekeeper_discovery_request = Some(msg.clone())
|
||||
}
|
||||
Message::SafekeeperDiscoveryResponse(msg) => {
|
||||
res.safekeeper_discovery_response = Some(msg.clone())
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Get the message type.
|
||||
pub fn message_type(&self) -> MessageType {
|
||||
match self {
|
||||
Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
|
||||
Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
|
||||
Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
|
||||
}
|
||||
}
|
||||
}
|
||||
type PubId = u64; // id of publisher for registering in maps
|
||||
type SubId = u64; // id of subscriber for registering in maps
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum SubscriptionKey {
|
||||
@@ -184,7 +83,7 @@ enum SubscriptionKey {
|
||||
}
|
||||
|
||||
impl SubscriptionKey {
|
||||
/// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
|
||||
pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
|
||||
match key {
|
||||
ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
|
||||
@@ -193,29 +92,14 @@ impl SubscriptionKey {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse from FilterTenantTimelineId
|
||||
pub fn from_proto_filter_tenant_timeline_id(
|
||||
f: &FilterTenantTimelineId,
|
||||
) -> Result<Self, Status> {
|
||||
if !f.enabled {
|
||||
return Ok(SubscriptionKey::All);
|
||||
}
|
||||
|
||||
let ttid =
|
||||
parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
Ok(SubscriptionKey::Timeline(ttid))
|
||||
}
|
||||
}
|
||||
|
||||
/// Channel to timeline subscribers.
|
||||
// Channel to timeline subscribers.
|
||||
struct ChanToTimelineSub {
|
||||
chan: broadcast::Sender<Message>,
|
||||
/// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
/// is unhandy for that as unregistering and dropping the receiver side
|
||||
/// happens at different moments.
|
||||
chan: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
// Tracked separately to know when delete the shmem entry. receiver_count()
|
||||
// is unhandy for that as unregistering and dropping the receiver side
|
||||
// happens at different moments.
|
||||
num_subscribers: u64,
|
||||
}
|
||||
|
||||
@@ -226,7 +110,7 @@ struct SharedState {
|
||||
num_subs_to_timelines: i64,
|
||||
chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
|
||||
num_subs_to_all: i64,
|
||||
chan_to_all_subs: broadcast::Sender<Message>,
|
||||
chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -262,7 +146,7 @@ impl SharedState {
|
||||
&mut self,
|
||||
sub_key: SubscriptionKey,
|
||||
timeline_chan_size: usize,
|
||||
) -> (SubId, broadcast::Receiver<Message>) {
|
||||
) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
|
||||
let sub_id = self.next_sub_id;
|
||||
self.next_sub_id += 1;
|
||||
let sub_rx = match sub_key {
|
||||
@@ -378,29 +262,6 @@ impl Registry {
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr
|
||||
);
|
||||
}
|
||||
|
||||
/// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
|
||||
PROCESSED_MESSAGES_TOTAL.inc();
|
||||
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers, if there is ttid
|
||||
let ttid = msg.tenant_timeline_id()?;
|
||||
if let Some(ttid) = ttid {
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Private subscriber state.
|
||||
@@ -408,7 +269,7 @@ struct Subscriber {
|
||||
id: SubId,
|
||||
key: SubscriptionKey,
|
||||
// Subscriber receives messages from publishers here.
|
||||
sub_rx: broadcast::Receiver<Message>,
|
||||
sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
|
||||
// to unregister itself from shared state in Drop
|
||||
registry: Registry,
|
||||
// for logging
|
||||
@@ -430,9 +291,26 @@ struct Publisher {
|
||||
}
|
||||
|
||||
impl Publisher {
|
||||
/// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
|
||||
self.registry.send_msg(msg)
|
||||
// Send msg to relevant subscribers.
|
||||
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
|
||||
// send message to subscribers for everything
|
||||
let shared_state = self.registry.shared_state.read();
|
||||
// Err means there is no subscribers, it is fine.
|
||||
shared_state.chan_to_all_subs.send(msg.clone()).ok();
|
||||
|
||||
// send message to per timeline subscribers
|
||||
let ttid =
|
||||
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
|
||||
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
|
||||
})?)?;
|
||||
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
|
||||
// Err can't happen here, as tx is destroyed only after removing
|
||||
// from the map the last subscriber along with tx.
|
||||
subs.chan
|
||||
.send(msg.clone())
|
||||
.expect("rx is still in the map with zero subscribers");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -461,7 +339,7 @@ impl BrokerService for Broker {
|
||||
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
|
||||
Some(Ok(msg)) => publisher.send_msg(&msg)?,
|
||||
Some(Err(e)) => return Err(e), // grpc error from the stream
|
||||
None => break, // closed stream
|
||||
}
|
||||
@@ -493,15 +371,8 @@ impl BrokerService for Broker {
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(info) => {
|
||||
match info {
|
||||
Message::SafekeeperTimelineInfo(info) => yield info,
|
||||
_ => {},
|
||||
}
|
||||
BROADCASTED_MESSAGES_TOTAL.inc();
|
||||
},
|
||||
Ok(info) => yield info,
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
|
||||
missed_msgs += skipped_msg;
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
@@ -521,78 +392,6 @@ impl BrokerService for Broker {
|
||||
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
|
||||
))
|
||||
}
|
||||
|
||||
type SubscribeByFilterStream =
|
||||
Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
|
||||
|
||||
/// Subscribe to all messages, limited by a filter.
|
||||
async fn subscribe_by_filter(
|
||||
&self,
|
||||
request: Request<SubscribeByFilterRequest>,
|
||||
) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
|
||||
let remote_addr = request
|
||||
.remote_addr()
|
||||
.expect("TCPConnectInfo inserted by handler");
|
||||
let proto_filter = request.into_inner();
|
||||
let ttid_filter = proto_filter
|
||||
.tenant_timeline_id
|
||||
.as_ref()
|
||||
.ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
|
||||
|
||||
let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
|
||||
let types_set = proto_filter
|
||||
.types
|
||||
.iter()
|
||||
.map(|t| t.r#type)
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
|
||||
|
||||
// transform rx into stream with item = Result, as method result demands
|
||||
let output = async_stream::try_stream! {
|
||||
let mut warn_interval = time::interval(Duration::from_millis(1000));
|
||||
let mut missed_msgs: u64 = 0;
|
||||
loop {
|
||||
match subscriber.sub_rx.recv().await {
|
||||
Ok(msg) => {
|
||||
let msg_type = msg.message_type() as i32;
|
||||
if types_set.contains(&msg_type) {
|
||||
yield msg.as_typed_message();
|
||||
BROADCASTED_MESSAGES_TOTAL.inc();
|
||||
}
|
||||
},
|
||||
Err(RecvError::Lagged(skipped_msg)) => {
|
||||
BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
|
||||
missed_msgs += skipped_msg;
|
||||
if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
|
||||
warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
|
||||
subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
|
||||
missed_msgs = 0;
|
||||
}
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
// can't happen, we never drop the channel while there is a subscriber
|
||||
Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Response::new(
|
||||
Box::pin(output) as Self::SubscribeByFilterStream
|
||||
))
|
||||
}
|
||||
|
||||
/// Publish one message.
|
||||
async fn publish_one(
|
||||
&self,
|
||||
request: Request<TypedMessage>,
|
||||
) -> std::result::Result<Response<()>, Status> {
|
||||
let msg = Message::from(request.into_inner())?;
|
||||
PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
|
||||
self.registry.send_msg(&msg)?;
|
||||
Ok(Response::new(()))
|
||||
}
|
||||
}
|
||||
|
||||
// We serve only metrics and healthcheck through http1.
|
||||
@@ -716,8 +515,8 @@ mod tests {
|
||||
use tokio::sync::broadcast::error::TryRecvError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
fn msg(timeline_id: Vec<u8>) -> Message {
|
||||
Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
|
||||
fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
|
||||
SafekeeperTimelineInfo {
|
||||
safekeeper_id: 1,
|
||||
tenant_timeline_id: Some(ProtoTenantTimelineId {
|
||||
tenant_id: vec![0x00; 16],
|
||||
@@ -734,7 +533,7 @@ mod tests {
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Broker metrics.
|
||||
|
||||
use metrics::{register_int_counter, register_int_gauge, IntCounter, IntGauge};
|
||||
use metrics::{register_int_gauge, IntGauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub static NUM_PUBS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
@@ -23,35 +23,3 @@ pub static NUM_SUBS_ALL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static PROCESSED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_processed_messages_total",
|
||||
"Number of messages received by storage broker, before routing and broadcasting"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static BROADCASTED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_broadcasted_messages_total",
|
||||
"Number of messages broadcasted (sent over network) to subscribers"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static BROADCAST_DROPPED_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_broadcast_dropped_messages_total",
|
||||
"Number of messages dropped due to channel capacity overflow"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
pub static PUBLISHED_ONEOFF_MESSAGES_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"storage_broker_published_oneoff_messages_total",
|
||||
"Number of one-off messages sent via PublishOne method"
|
||||
)
|
||||
.expect("Failed to register metric")
|
||||
});
|
||||
|
||||
@@ -457,6 +457,7 @@ class NeonEnvBuilder:
|
||||
self.preserve_database_files = preserve_database_files
|
||||
self.initial_tenant = initial_tenant or TenantId.generate()
|
||||
self.initial_timeline = initial_timeline or TimelineId.generate()
|
||||
self.enable_generations = True
|
||||
self.scrub_on_exit = False
|
||||
self.test_output_dir = test_output_dir
|
||||
|
||||
@@ -676,7 +677,8 @@ class NeonEnvBuilder:
|
||||
|
||||
pageserver.stop(immediate=True)
|
||||
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.stop(immediate=True)
|
||||
|
||||
cleanup_error = None
|
||||
|
||||
@@ -770,9 +772,13 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(self)
|
||||
if config.enable_generations:
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
else:
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
cfg: Dict[str, Any] = {
|
||||
@@ -845,7 +851,8 @@ class NeonEnv:
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
|
||||
self.attachment_service.start()
|
||||
if self.attachment_service is not None:
|
||||
self.attachment_service.start()
|
||||
|
||||
for pageserver in self.pageservers:
|
||||
pageserver.start()
|
||||
@@ -1827,19 +1834,20 @@ class NeonPageserver(PgProtocol):
|
||||
"""
|
||||
client = self.http_client()
|
||||
return client.tenant_attach(
|
||||
tenant_id,
|
||||
config,
|
||||
config_null,
|
||||
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
|
||||
tenant_id, config, config_null, generation=self.maybe_get_generation(tenant_id)
|
||||
)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_detach(tenant_id)
|
||||
|
||||
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
|
||||
# This API is only for use when generations are enabled
|
||||
assert self.env.attachment_service is not None
|
||||
|
||||
if config["mode"].startswith("Attached") and "generation" not in config:
|
||||
config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
|
||||
@@ -1865,15 +1873,26 @@ class NeonPageserver(PgProtocol):
|
||||
generation: Optional[int] = None,
|
||||
) -> TenantId:
|
||||
if generation is None:
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
generation = self.maybe_get_generation(tenant_id)
|
||||
client = self.http_client(auth_token=auth_token)
|
||||
return client.tenant_create(tenant_id, conf, generation=generation)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
client = self.http_client()
|
||||
return client.tenant_load(
|
||||
tenant_id, generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
)
|
||||
return client.tenant_load(tenant_id, generation=self.maybe_get_generation(tenant_id))
|
||||
|
||||
def maybe_get_generation(self, tenant_id: TenantId):
|
||||
"""
|
||||
For tests that would like to use an HTTP client directly instead of using
|
||||
the `tenant_attach` and `tenant_create` helpers here: issue a generation
|
||||
number for a tenant.
|
||||
|
||||
Returns None if the attachment service is not enabled (legacy mode)
|
||||
"""
|
||||
if self.env.attachment_service is not None:
|
||||
return self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(
|
||||
|
||||
@@ -125,51 +125,3 @@ class TenantId(Id):
|
||||
class TimelineId(Id):
|
||||
def __repr__(self) -> str:
|
||||
return f'TimelineId("{self.id.hex()}")'
|
||||
|
||||
|
||||
# Workaround for compat with python 3.9, which does not have `typing.Self`
|
||||
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
|
||||
|
||||
|
||||
class TenantShardId:
|
||||
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):
|
||||
self.tenant_id = tenant_id
|
||||
self.shard_number = shard_number
|
||||
self.shard_count = shard_count
|
||||
assert self.shard_number < self.shard_count or self.shard_count == 0
|
||||
|
||||
@classmethod
|
||||
def parse(cls: Type[TTenantShardId], input) -> TTenantShardId:
|
||||
if len(input) == 32:
|
||||
return cls(
|
||||
tenant_id=TenantId(input),
|
||||
shard_number=0,
|
||||
shard_count=0,
|
||||
)
|
||||
elif len(input) == 37:
|
||||
return cls(
|
||||
tenant_id=TenantId(input[0:32]),
|
||||
shard_number=int(input[33:35], 16),
|
||||
shard_count=int(input[35:37], 16),
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Invalid TenantShardId '{input}'")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
|
||||
def _tuple(self) -> tuple[TenantId, int, int]:
|
||||
return (self.tenant_id, self.shard_number, self.shard_count)
|
||||
|
||||
def __lt__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() < other._tuple()
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, type(self)):
|
||||
return NotImplemented
|
||||
return self._tuple() == other._tuple()
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self._tuple())
|
||||
|
||||
@@ -61,6 +61,7 @@ def measure_recovery_time(env: NeonCompare):
|
||||
# of view, but the same as far as the safekeeper/WAL is concerned. To work around that,
|
||||
# we will explicitly create the tenant in the same generation that it was previously
|
||||
# attached in.
|
||||
assert env.env.attachment_service is not None
|
||||
attach_status = env.env.attachment_service.inspect(tenant_id=env.tenant)
|
||||
assert attach_status is not None
|
||||
(attach_gen, _) = attach_status
|
||||
|
||||
@@ -136,7 +136,10 @@ def test_no_config(positive_env: NeonEnv, content_type: Optional[str]):
|
||||
ps_http.tenant_detach(tenant_id)
|
||||
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
|
||||
|
||||
body = {"generation": env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)}
|
||||
body = {}
|
||||
gen = env.pageserver.maybe_get_generation(tenant_id)
|
||||
if gen is not None:
|
||||
body["generation"] = gen
|
||||
|
||||
ps_http.post(
|
||||
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
|
||||
|
||||
@@ -87,6 +87,7 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
#
|
||||
# Since we're dual-attached, need to tip-off attachment service to treat the one we're
|
||||
# about to start as the attached pageserver
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, env.pageservers[0].id)
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].stop()
|
||||
|
||||
@@ -157,6 +157,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
|
||||
|
||||
def get_generation_number():
|
||||
assert env.attachment_service is not None
|
||||
attachment = env.attachment_service.inspect(tenant_id)
|
||||
assert attachment is not None
|
||||
return attachment[0]
|
||||
|
||||
@@ -72,9 +72,7 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
|
||||
# create new tenant and check it is also there
|
||||
tenant_id = TenantId.generate()
|
||||
client.tenant_create(
|
||||
tenant_id, generation=env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
)
|
||||
client.tenant_create(tenant_id, generation=env.pageserver.maybe_get_generation(tenant_id))
|
||||
assert tenant_id in {TenantId(t["id"]) for t in client.tenant_list()}
|
||||
|
||||
timelines = client.timeline_list(tenant_id)
|
||||
|
||||
@@ -187,6 +187,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
- After upgrade, the bucket should contain a mixture.
|
||||
- In both cases, postgres I/O should work.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -195,6 +196,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env.broker.try_start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.start()
|
||||
|
||||
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
|
||||
@@ -260,10 +262,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
some_other_pageserver = 1234
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -337,6 +341,7 @@ def test_deletion_queue_recovery(
|
||||
:param validate_before: whether to wait for deletions to be validated before restart. This
|
||||
makes them elegible to be executed after restart, if the same node keeps the attachment.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -400,6 +405,7 @@ def test_deletion_queue_recovery(
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
@@ -447,6 +453,7 @@ def test_deletion_queue_recovery(
|
||||
|
||||
|
||||
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -466,6 +473,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
|
||||
# Simulate a major incident: the control plane goes offline
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.stop()
|
||||
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
@@ -537,6 +545,7 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
|
||||
and must be constructed using the proper generation for the layer, which may not be the same generation
|
||||
that the tenant is running in.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
@@ -566,6 +575,7 @@ def test_multi_attach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
|
||||
@@ -9,7 +9,9 @@ from fixtures.utils import wait_until
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("generations", [True, False])
|
||||
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool):
|
||||
neon_env_builder.enable_generations = generations
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
|
||||
|
||||
@@ -57,11 +57,13 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
states are valid, so that we may test it in this way: the API should always
|
||||
work as long as the tenant exists.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 3
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
pageservers = env.pageservers
|
||||
list([p.http_client() for p in pageservers])
|
||||
@@ -208,11 +210,13 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the sequence of location states that are used in a live migration.
|
||||
"""
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -60,6 +60,8 @@ def test_remote_storage_backup_and_restore(
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
neon_env_builder.enable_generations = generations
|
||||
|
||||
# Exercise retry code path by making all uploads and downloads fail for the
|
||||
# first time. The retries print INFO-messages to the log; we will check
|
||||
# that they are present after the test.
|
||||
|
||||
@@ -263,6 +263,15 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
ps_http, env.initial_tenant, timeline_id, iterations=iterations
|
||||
)
|
||||
|
||||
if failpoint == "timeline-delete-after-index-delete":
|
||||
m = ps_http.get_metrics()
|
||||
assert (
|
||||
m.query_one(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
filter={"request_type": "get_object", "result": "ok"},
|
||||
).value
|
||||
== 1 # index part for initial timeline
|
||||
)
|
||||
elif check is Check.RETRY_WITHOUT_RESTART:
|
||||
# this should succeed
|
||||
# this also checks that delete can be retried even when timeline is in Broken state
|
||||
|
||||
@@ -33,11 +33,8 @@ dashmap = { version = "5", default-features = false, features = ["raw-api"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures = { version = "0.3" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-core = { version = "0.3" }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-io = { version = "0.3" }
|
||||
futures-sink = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
@@ -56,7 +53,6 @@ regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
|
||||
ring = { version = "0.16" }
|
||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
|
||||
Reference in New Issue
Block a user