diff --git a/Cargo.lock b/Cargo.lock index b72911c7a8..d5a5135954 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1097,7 +1097,7 @@ checksum = "975982cdb7ad6a142be15bdf84aea7ec6a9e5d4d797c004d43185b24cfe4e684" dependencies = [ "clap", "heck 0.5.0", - "indexmap 2.9.0", + "indexmap 2.10.0", "log", "proc-macro2", "quote", @@ -1296,8 +1296,14 @@ dependencies = [ name = "communicator" version = "0.1.0" dependencies = [ + "axum", "cbindgen", - "neon-shmem", + "http 1.3.1", + "measured", + "tokio", + "tracing", + "tracing-subscriber", + "utils", "workspace_hack", ] @@ -1307,7 +1313,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", - "indexmap 2.9.0", + "indexmap 2.10.0", "jsonwebtoken", "regex", "remote_storage", @@ -1341,7 +1347,10 @@ dependencies = [ "futures", "hostname-validator", "http 1.3.1", - "indexmap 2.9.0", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "indexmap 2.10.0", "itertools 0.10.5", "jsonwebtoken", "metrics", @@ -1363,6 +1372,7 @@ dependencies = [ "ring", "rlimit", "rust-ini", + "scopeguard", "serde", "serde_json", "serde_with", @@ -1373,7 +1383,7 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-util", - "tonic 0.13.1", + "tonic", "tower 0.5.2", "tower-http", "tower-otel", @@ -2649,7 +2659,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.9", - "indexmap 2.9.0", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -2668,7 +2678,7 @@ dependencies = [ "futures-sink", "futures-util", "http 1.3.1", - "indexmap 2.9.0", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -2927,7 +2937,7 @@ dependencies = [ "pprof", "regex", "routerify", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-pemfile 2.1.1", "serde", "serde_json", @@ -3264,9 +3274,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -3292,7 +3302,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" dependencies = [ "ahash", - "indexmap 2.9.0", + "indexmap 2.10.0", "is-terminal", "itoa", "log", @@ -3315,7 +3325,7 @@ dependencies = [ "crossbeam-utils", "dashmap 6.1.0", "env_logger", - "indexmap 2.9.0", + "indexmap 2.10.0", "itoa", "log", "num-format", @@ -4152,23 +4162,23 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "opentelemetry" -version = "0.27.1" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" dependencies = [ "futures-core", "futures-sink", "js-sys", "pin-project-lite", - "thiserror 1.0.69", + "thiserror 2.0.11", "tracing", ] [[package]] name = "opentelemetry-http" -version = "0.27.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80" +checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" dependencies = [ "async-trait", "bytes", @@ -4179,12 +4189,10 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.27.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" +checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" dependencies = [ - "async-trait", - "futures-core", "http 1.3.1", "opentelemetry", "opentelemetry-http", @@ -4192,46 +4200,43 @@ dependencies = [ "opentelemetry_sdk", "prost 0.13.5", "reqwest", - "thiserror 1.0.69", + "thiserror 2.0.11", ] [[package]] name = "opentelemetry-proto" -version = "0.27.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost 0.13.5", - "tonic 0.12.3", + "tonic", ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.27.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc1b6902ff63b32ef6c489e8048c5e253e2e4a803ea3ea7e783914536eb15c52" +checksum = "83d059a296a47436748557a353c5e6c5705b9470ef6c95cfc52c21a8814ddac2" [[package]] name = "opentelemetry_sdk" -version = "0.27.1" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" dependencies = [ - "async-trait", "futures-channel", "futures-executor", "futures-util", - "glob", "opentelemetry", "percent-encoding", - "rand 0.8.5", + "rand 0.9.1", "serde_json", - "thiserror 1.0.69", + "thiserror 2.0.11", "tokio", "tokio-stream", - "tracing", ] [[package]] @@ -4358,7 +4363,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic 0.13.1", + "tonic", "tracing", "url", "utils", @@ -4455,7 +4460,7 @@ dependencies = [ "reqwest", "rpds", "rstest", - "rustls 0.23.27", + "rustls 0.23.29", "scopeguard", "send-future", "serde", @@ -4479,7 +4484,7 @@ dependencies = [ "tokio-tar", "tokio-util", "toml_edit", - "tonic 0.13.1", + "tonic", "tonic-reflection", "tower 0.5.2", "tracing", @@ -4565,7 +4570,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic 0.13.1", + "tonic", "tracing", "utils", "workspace_hack", @@ -4611,7 +4616,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-util", - "tonic 0.13.1", + "tonic", "tonic-build", "utils", "workspace_hack", @@ -4993,7 +4998,7 @@ dependencies = [ "bytes", "once_cell", "pq_proto", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-pemfile 2.1.1", "serde", "thiserror 1.0.69", @@ -5392,7 +5397,7 @@ dependencies = [ "hyper 0.14.30", "hyper 1.4.1", "hyper-util", - "indexmap 2.9.0", + "indexmap 2.10.0", "ipnet", "itertools 0.10.5", "itoa", @@ -5429,7 +5434,7 @@ dependencies = [ "rsa", "rstest", "rustc-hash 2.1.1", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-native-certs 0.8.0", "rustls-pemfile 2.1.1", "scopeguard", @@ -5708,7 +5713,7 @@ dependencies = [ "num-bigint", "percent-encoding", "pin-project-lite", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-native-certs 0.8.0", "ryu", "sha1_smol", @@ -5937,9 +5942,9 @@ dependencies = [ [[package]] name = "reqwest-tracing" -version = "0.5.5" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73e6153390585f6961341b50e5a1931d6be6dee4292283635903c26ef9d980d2" +checksum = "d70ea85f131b2ee9874f0b160ac5976f8af75f3c9badfe0d955880257d10bd83" dependencies = [ "anyhow", "async-trait", @@ -6164,15 +6169,15 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.27" +version = "0.23.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" +checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1" dependencies = [ "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.3", + "rustls-webpki 0.103.4", "subtle", "zeroize", ] @@ -6236,9 +6241,12 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +dependencies = [ + "zeroize", +] [[package]] name = "rustls-webpki" @@ -6263,9 +6271,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.3" +version = "0.103.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" +checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" dependencies = [ "ring", "rustls-pki-types", @@ -6326,7 +6334,7 @@ dependencies = [ "regex", "remote_storage", "reqwest", - "rustls 0.23.27", + "rustls 0.23.29", "safekeeper_api", "safekeeper_client", "scopeguard", @@ -6516,7 +6524,7 @@ checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335" dependencies = [ "httpdate", "reqwest", - "rustls 0.23.27", + "rustls 0.23.29", "sentry-backtrace", "sentry-contexts", "sentry-core", @@ -6648,7 +6656,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4" dependencies = [ "form_urlencoded", - "indexmap 2.9.0", + "indexmap 2.10.0", "itoa", "ryu", "serde", @@ -6729,7 +6737,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.9.0", + "indexmap 2.10.0", "serde", "serde_derive", "serde_json", @@ -6972,10 +6980,10 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "prost 0.13.5", - "rustls 0.23.27", + "rustls 0.23.29", "tokio", "tokio-rustls 0.26.2", - "tonic 0.13.1", + "tonic", "tonic-build", "tracing", "utils", @@ -7020,7 +7028,7 @@ dependencies = [ "regex", "reqwest", "routerify", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-native-certs 0.8.0", "safekeeper_api", "safekeeper_client", @@ -7074,7 +7082,7 @@ dependencies = [ "postgres_ffi", "remote_storage", "reqwest", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-native-certs 0.8.0", "serde", "serde_json", @@ -7613,7 +7621,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab" dependencies = [ "ring", - "rustls 0.23.27", + "rustls 0.23.29", "tokio", "tokio-postgres", "tokio-rustls 0.26.2", @@ -7664,7 +7672,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.27", + "rustls 0.23.29", "tokio", ] @@ -7763,34 +7771,13 @@ version = "0.22.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.10.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] -[[package]] -name = "tonic" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" -dependencies = [ - "async-trait", - "base64 0.22.1", - "bytes", - "http 1.3.1", - "http-body 1.0.0", - "http-body-util", - "percent-encoding", - "pin-project", - "prost 0.13.5", - "tokio-stream", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tonic" version = "0.13.1" @@ -7848,7 +7835,7 @@ dependencies = [ "prost-types 0.13.5", "tokio", "tokio-stream", - "tonic 0.13.1", + "tonic", ] [[package]] @@ -7874,7 +7861,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.9.0", + "indexmap 2.10.0", "pin-project-lite", "slab", "sync_wrapper 1.0.1", @@ -7912,10 +7899,14 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-otel" -version = "0.2.0" -source = "git+https://github.com/mattiapenati/tower-otel?rev=56a7321053bcb72443888257b622ba0d43a11fcd#56a7321053bcb72443888257b622ba0d43a11fcd" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "345000ea5ae33222624a8ccfdd88892c30db4d413a39c2d4bd714b77e0a4b23c" dependencies = [ + "axum", + "cfg-if", "http 1.3.1", + "http-body 1.0.0", "opentelemetry", "pin-project", "tower-layer", @@ -7997,9 +7988,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.28.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" +checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" dependencies = [ "js-sys", "once_cell", @@ -8207,7 +8198,7 @@ dependencies = [ "base64 0.22.1", "log", "once_cell", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-pki-types", "url", "webpki-roots", @@ -8879,7 +8870,7 @@ dependencies = [ "hyper 0.14.30", "hyper 1.4.1", "hyper-util", - "indexmap 2.9.0", + "indexmap 2.10.0", "itertools 0.12.1", "lazy_static", "libc", @@ -8902,14 +8893,14 @@ dependencies = [ "proc-macro2", "prost 0.13.5", "quote", - "rand 0.8.5", + "rand 0.9.1", "regex", "regex-automata 0.4.9", "regex-syntax 0.8.5", "reqwest", - "rustls 0.23.27", + "rustls 0.23.29", "rustls-pki-types", - "rustls-webpki 0.103.3", + "rustls-webpki 0.103.4", "scopeguard", "sec1 0.7.3", "serde", @@ -8922,6 +8913,7 @@ dependencies = [ "subtle", "syn 2.0.100", "sync_wrapper 0.1.2", + "thiserror 2.0.11", "tikv-jemalloc-ctl", "tikv-jemalloc-sys", "time", @@ -8931,6 +8923,7 @@ dependencies = [ "tokio-stream", "tokio-util", "toml_edit", + "tonic", "tower 0.5.2", "tracing", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 3a57976cd8..00efe79554 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,10 +143,10 @@ notify = "6.0.0" num_cpus = "1.15" num-traits = "0.2.19" once_cell = "1.13" -opentelemetry = "0.27" -opentelemetry_sdk = "0.27" -opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] } -opentelemetry-semantic-conventions = "0.27" +opentelemetry = "0.30" +opentelemetry_sdk = "0.30" +opentelemetry-otlp = { version = "0.30", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] } +opentelemetry-semantic-conventions = "0.30" parking_lot = "0.12" parquet = { version = "53", default-features = false, features = ["zstd"] } parquet_derive = "53" @@ -164,7 +164,7 @@ rand_core = "=0.6" redis = { version = "0.29.2", features = ["tokio-rustls-comp", "keep-alive"] } regex = "1.10.2" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } -reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_27"] } +reqwest-tracing = { version = "0.5", features = ["opentelemetry_0_30"] } reqwest-middleware = "0.4" reqwest-retry = "0.7" routerify = "3" @@ -214,15 +214,12 @@ tonic = { version = "0.13.1", default-features = false, features = ["channel", " tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } - -# This revision uses opentelemetry 0.27. There's no tag for it. -tower-otel = { git = "https://github.com/mattiapenati/tower-otel", rev = "56a7321053bcb72443888257b622ba0d43a11fcd" } - +tower-otel = { version = "0.6", features = ["axum"] } tower-service = "0.3.3" tracing = "0.1" tracing-error = "0.2" tracing-log = "0.2" -tracing-opentelemetry = "0.28" +tracing-opentelemetry = "0.31" tracing-serde = "0.2.0" tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] } try-lock = "0.2.5" diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 910bae3bda..496471acc7 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -27,7 +27,10 @@ fail.workspace = true flate2.workspace = true futures.workspace = true http.workspace = true +http-body-util.workspace = true hostname-validator = "1.1" +hyper.workspace = true +hyper-util.workspace = true indexmap.workspace = true itertools.workspace = true jsonwebtoken.workspace = true @@ -44,6 +47,7 @@ postgres.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["json"] } ring = "0.17" +scopeguard.workspace = true serde.workspace = true serde_with.workspace = true serde_json.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 78e2c6308f..3d07a2ece8 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -188,7 +188,7 @@ fn main() -> Result<()> { .build()?; let _rt_guard = runtime.enter(); - runtime.block_on(init(cli.dev))?; + let tracing_provider = init(cli.dev)?; // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; @@ -227,11 +227,11 @@ fn main() -> Result<()> { scenario.teardown(); - deinit_and_exit(exit_code); + deinit_and_exit(tracing_provider, exit_code); } -async fn init(dev_mode: bool) -> Result<()> { - init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?; +fn init(dev_mode: bool) -> Result> { + let provider = init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; thread::spawn(move || { @@ -242,7 +242,7 @@ async fn init(dev_mode: bool) -> Result<()> { info!("compute build_tag: {}", &BUILD_TAG.to_string()); - Ok(()) + Ok(provider) } fn get_config(cli: &Cli) -> Result { @@ -267,25 +267,27 @@ fn get_config(cli: &Cli) -> Result { } } -fn deinit_and_exit(exit_code: Option) -> ! { - // Shutdown trace pipeline gracefully, so that it has a chance to send any - // pending traces before we exit. Shutting down OTEL tracing provider may - // hang for quite some time, see, for example: - // - https://github.com/open-telemetry/opentelemetry-rust/issues/868 - // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636 - // - // Yet, we want computes to shut down fast enough, as we may need a new one - // for the same timeline ASAP. So wait no longer than 2s for the shutdown to - // complete, then just error out and exit the main thread. - info!("shutting down tracing"); - let (sender, receiver) = mpsc::channel(); - let _ = thread::spawn(move || { - tracing_utils::shutdown_tracing(); - sender.send(()).ok() - }); - let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000)); - if shutdown_res.is_err() { - error!("timed out while shutting down tracing, exiting anyway"); +fn deinit_and_exit(tracing_provider: Option, exit_code: Option) -> ! { + if let Some(p) = tracing_provider { + // Shutdown trace pipeline gracefully, so that it has a chance to send any + // pending traces before we exit. Shutting down OTEL tracing provider may + // hang for quite some time, see, for example: + // - https://github.com/open-telemetry/opentelemetry-rust/issues/868 + // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636 + // + // Yet, we want computes to shut down fast enough, as we may need a new one + // for the same timeline ASAP. So wait no longer than 2s for the shutdown to + // complete, then just error out and exit the main thread. + info!("shutting down tracing"); + let (sender, receiver) = mpsc::channel(); + let _ = thread::spawn(move || { + _ = p.shutdown(); + sender.send(()).ok() + }); + let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000)); + if shutdown_res.is_err() { + error!("timed out while shutting down tracing, exiting anyway"); + } } info!("shutting down"); diff --git a/compute_tools/src/communicator_socket_client.rs b/compute_tools/src/communicator_socket_client.rs new file mode 100644 index 0000000000..806e0a21e3 --- /dev/null +++ b/compute_tools/src/communicator_socket_client.rs @@ -0,0 +1,98 @@ +//! Client for making request to a running Postgres server's communicator control socket. +//! +//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in +//! a Unix Domain Socket in the Postgres data directory. This provides access to it. + +use std::path::Path; + +use anyhow::Context; +use hyper::client::conn::http1::SendRequest; +use hyper_util::rt::TokioIo; + +/// Name of the socket within the Postgres data directory. This better match that in +/// `pgxn/neon/communicator/src/lib.rs`. +const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket"; + +/// Open a connection to the communicator's control socket, prepare to send requests to it +/// with hyper. +pub async fn connect_communicator_socket(pgdata: &Path) -> anyhow::Result> +where + B: hyper::body::Body + 'static + Send, + B::Data: Send, + B::Error: Into>, +{ + let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME); + let socket_path_len = socket_path.display().to_string().len(); + + // There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a + // Unix Domain socket. The limit is on the connect(2) function used to open the + // socket, not on the absolute path itself. Postgres changes the current directory to + // the data directory and uses a relative path to bind to the socket, and the relative + // path "./neon-communicator.socket" is always short, but when compute_ctl needs to + // open the socket, we need to use a full path, which can be arbitrarily long. + // + // There are a few ways we could work around this: + // + // 1. Change the current directory to the Postgres data directory and use a relative + // path in the connect(2) call. That's problematic because the current directory + // applies to the whole process. We could change the current directory early in + // compute_ctl startup, and that might be a good idea anyway for other reasons too: + // it would be more robust if the data directory is moved around or unlinked for + // some reason, and you would be less likely to accidentally litter other parts of + // the filesystem with e.g. temporary files. However, that's a pretty invasive + // change. + // + // 2. On Linux, you could open() the data directory, and refer to the the socket + // inside it as "/proc/self/fd//neon-communicator.socket". But that's + // Linux-only. + // + // 3. Create a symbolic link to the socket with a shorter path, and use that. + // + // We use the symbolic link approach here. Hopefully the paths we use in production + // are shorter, so that we can open the socket directly, so that this hack is needed + // only in development. + let connect_result = if socket_path_len < 100 { + // We can open the path directly with no hacks. + tokio::net::UnixStream::connect(socket_path).await + } else { + // The path to the socket is too long. Create a symlink to it with a shorter path. + let short_path = std::env::temp_dir().join(format!( + "compute_ctl.short-socket.{}.{}", + std::process::id(), + tokio::task::id() + )); + std::os::unix::fs::symlink(&socket_path, &short_path)?; + + // Delete the symlink as soon as we have connected to it. There's a small chance + // of leaking if the process dies before we remove it, so try to keep that window + // as small as possible. + scopeguard::defer! { + if let Err(err) = std::fs::remove_file(&short_path) { + tracing::warn!("could not remove symlink \"{}\" created for socket: {}", + short_path.display(), err); + } + } + + tracing::info!( + "created symlink \"{}\" for socket \"{}\", opening it now", + short_path.display(), + socket_path.display() + ); + + tokio::net::UnixStream::connect(&short_path).await + }; + + let stream = connect_result.context("connecting to communicator control socket")?; + + let io = TokioIo::new(stream); + let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?; + + // spawn a task to poll the connection and drive the HTTP state + tokio::spawn(async move { + if let Err(err) = connection.await { + eprintln!("Error in connection: {err}"); + } + }); + + Ok(request_sender) +} diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs index da8d8b20a5..96b464fd12 100644 --- a/compute_tools/src/http/routes/metrics.rs +++ b/compute_tools/src/http/routes/metrics.rs @@ -1,10 +1,18 @@ +use std::path::Path; +use std::sync::Arc; + +use anyhow::Context; use axum::body::Body; +use axum::extract::State; use axum::response::Response; -use http::StatusCode; use http::header::CONTENT_TYPE; +use http_body_util::BodyExt; +use hyper::{Request, StatusCode}; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder}; +use crate::communicator_socket_client::connect_communicator_socket; +use crate::compute::ComputeNode; use crate::http::JsonResponse; use crate::metrics::collect; @@ -31,3 +39,42 @@ pub(in crate::http) async fn get_metrics() -> Response { .body(Body::from(buffer)) .unwrap() } + +/// Fetch and forward metrics from the Postgres neon extension's metrics +/// exporter that are used by autoscaling-agent. +/// +/// The neon extension exposes these metrics over a Unix domain socket +/// in the data directory. That's not accessible directly from the outside +/// world, so we have this endpoint in compute_ctl to expose it +pub(in crate::http) async fn get_autoscaling_metrics( + State(compute): State>, +) -> Result { + let pgdata = Path::new(&compute.params.pgdata); + + // Connect to the communicator process's metrics socket + let mut metrics_client = connect_communicator_socket(pgdata) + .await + .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?; + + // Make a request for /autoscaling_metrics + let request = Request::builder() + .method("GET") + .uri("/autoscaling_metrics") + .header("Host", "localhost") // hyper requires Host, even though the server won't care + .body(Body::from("")) + .unwrap(); + let resp = metrics_client + .send_request(request) + .await + .context("fetching metrics from Postgres metrics service") + .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?; + + // Build a response that just forwards the response we got. + let mut response = Response::builder(); + response = response.status(resp.status()); + if let Some(content_type) = resp.headers().get(CONTENT_TYPE) { + response = response.header(CONTENT_TYPE, content_type); + } + let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream()); + Ok(response.body(body).unwrap()) +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 17939e39d4..f0fbca8263 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -81,8 +81,12 @@ impl From<&Server> for Router> { Server::External { config, compute_id, .. } => { - let unauthenticated_router = - Router::>::new().route("/metrics", get(metrics::get_metrics)); + let unauthenticated_router = Router::>::new() + .route("/metrics", get(metrics::get_metrics)) + .route( + "/autoscaling_metrics", + get(metrics::get_autoscaling_metrics), + ); let authenticated_router = Router::>::new() .route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm)) diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 7511089a63..826373a9d2 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -4,6 +4,7 @@ #![deny(clippy::undocumented_unsafe_blocks)] pub mod checker; +pub mod communicator_socket_client; pub mod config; pub mod configurator; pub mod http; diff --git a/compute_tools/src/logger.rs b/compute_tools/src/logger.rs index c36f302f99..cd076472a6 100644 --- a/compute_tools/src/logger.rs +++ b/compute_tools/src/logger.rs @@ -13,7 +13,9 @@ use tracing_subscriber::prelude::*; /// set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. See /// `tracing-utils` package description. /// -pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> { +pub fn init_tracing_and_logging( + default_log_level: &str, +) -> anyhow::Result> { // Initialize Logging let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_log_level)); @@ -24,8 +26,9 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result .with_writer(std::io::stderr); // Initialize OpenTelemetry - let otlp_layer = - tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()).await; + let provider = + tracing_utils::init_tracing("compute_ctl", tracing_utils::ExportConfig::default()); + let otlp_layer = provider.as_ref().map(tracing_utils::layer); // Put it all together tracing_subscriber::registry() @@ -37,7 +40,7 @@ pub async fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); - Ok(()) + Ok(provider) } /// Replace all newline characters with a special character to make it diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index b42b93edc2..8f9c6da589 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -407,6 +407,12 @@ struct StorageControllerStartCmdArgs { help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)" )] base_port: Option, + + #[clap( + long, + help = "Whether the storage controller should handle pageserver-reported local disk loss events." + )] + handle_ps_local_disk_loss: Option, } #[derive(clap::Args)] @@ -1823,6 +1829,7 @@ async fn handle_storage_controller( instance_id: args.instance_id, base_port: args.base_port, start_timeout: args.start_timeout, + handle_ps_local_disk_loss: args.handle_ps_local_disk_loss, }; if let Err(e) = svc.start(start_args).await { diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index f996f39967..35a197112e 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -56,6 +56,7 @@ pub struct NeonStorageControllerStartArgs { pub instance_id: u8, pub base_port: Option, pub start_timeout: humantime::Duration, + pub handle_ps_local_disk_loss: Option, } impl NeonStorageControllerStartArgs { @@ -64,6 +65,7 @@ impl NeonStorageControllerStartArgs { instance_id: 1, base_port: None, start_timeout, + handle_ps_local_disk_loss: None, } } } @@ -669,6 +671,10 @@ impl StorageController { println!("Starting storage controller at {scheme}://{host}:{listen_port}"); + if start_args.handle_ps_local_disk_loss.unwrap_or_default() { + args.push("--handle-ps-local-disk-loss".to_string()); + } + background_process::start_process( COMMAND, &instance_dir, diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index e5bbfffd27..b009208d15 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -394,7 +394,7 @@ impl From<&OtelExporterConfig> for tracing_utils::ExportConfig { tracing_utils::ExportConfig { endpoint: Some(val.endpoint.clone()), protocol: val.protocol.into(), - timeout: val.timeout, + timeout: Some(val.timeout), } } } diff --git a/libs/pageserver_api/src/upcall_api.rs b/libs/pageserver_api/src/upcall_api.rs index 07cada2eb1..fa2c896edb 100644 --- a/libs/pageserver_api/src/upcall_api.rs +++ b/libs/pageserver_api/src/upcall_api.rs @@ -21,6 +21,14 @@ pub struct ReAttachRequest { /// if the node already has a node_id set. #[serde(skip_serializing_if = "Option::is_none", default)] pub register: Option, + + /// Hadron: Optional flag to indicate whether the node is starting with an empty local disk. + /// Will be set to true if the node couldn't find any local tenant data on startup, could be + /// due to the node starting for the first time or due to a local SSD failure/disk wipe event. + /// The flag may be used by the storage controller to update its observed state of the world + /// to make sure that it sends explicit location_config calls to the node following the + /// re-attach request. + pub empty_local_disk: Option, } #[derive(Serialize, Deserialize, Debug)] diff --git a/libs/tracing-utils/src/lib.rs b/libs/tracing-utils/src/lib.rs index 0893aa173b..76782339da 100644 --- a/libs/tracing-utils/src/lib.rs +++ b/libs/tracing-utils/src/lib.rs @@ -1,11 +1,5 @@ //! Helper functions to set up OpenTelemetry tracing. //! -//! This comes in two variants, depending on whether you have a Tokio runtime available. -//! If you do, call `init_tracing()`. It sets up the trace processor and exporter to use -//! the current tokio runtime. If you don't have a runtime available, or you don't want -//! to share the runtime with the tracing tasks, call `init_tracing_without_runtime()` -//! instead. It sets up a dedicated single-threaded Tokio runtime for the tracing tasks. -//! //! Example: //! //! ```rust,no_run @@ -21,7 +15,8 @@ //! .with_writer(std::io::stderr); //! //! // Initialize OpenTelemetry. Exports tracing spans as OpenTelemetry traces -//! let otlp_layer = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()).await; +//! let provider = tracing_utils::init_tracing("my_application", tracing_utils::ExportConfig::default()); +//! let otlp_layer = provider.as_ref().map(tracing_utils::layer); //! //! // Put it all together //! tracing_subscriber::registry() @@ -36,16 +31,18 @@ pub mod http; pub mod perf_span; -use opentelemetry::KeyValue; use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::WithExportConfig; pub use opentelemetry_otlp::{ExportConfig, Protocol}; +use opentelemetry_sdk::trace::SdkTracerProvider; use tracing::level_filters::LevelFilter; use tracing::{Dispatch, Subscriber}; use tracing_subscriber::Layer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::LookupSpan; +pub type Provider = SdkTracerProvider; + /// Set up OpenTelemetry exporter, using configuration from environment variables. /// /// `service_name` is set as the OpenTelemetry 'service.name' resource (see @@ -70,16 +67,7 @@ use tracing_subscriber::registry::LookupSpan; /// If you need some other setting, please test if it works first. And perhaps /// add a comment in the list above to save the effort of testing for the next /// person. -/// -/// This doesn't block, but is marked as 'async' to hint that this must be called in -/// asynchronous execution context. -pub async fn init_tracing( - service_name: &str, - export_config: ExportConfig, -) -> Option> -where - S: Subscriber + for<'span> LookupSpan<'span>, -{ +pub fn init_tracing(service_name: &str, export_config: ExportConfig) -> Option { if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) { return None; }; @@ -89,52 +77,14 @@ where )) } -/// Like `init_tracing`, but creates a separate tokio Runtime for the tracing -/// tasks. -pub fn init_tracing_without_runtime( - service_name: &str, - export_config: ExportConfig, -) -> Option> +pub fn layer(p: &Provider) -> impl Layer where S: Subscriber + for<'span> LookupSpan<'span>, { - if std::env::var("OTEL_SDK_DISABLED") == Ok("true".to_string()) { - return None; - }; - - // The opentelemetry batch processor and the OTLP exporter needs a Tokio - // runtime. Create a dedicated runtime for them. One thread should be - // enough. - // - // (Alternatively, instead of batching, we could use the "simple - // processor", which doesn't need Tokio, and use "reqwest-blocking" - // feature for the OTLP exporter, which also doesn't need Tokio. However, - // batching is considered best practice, and also I have the feeling that - // the non-Tokio codepaths in the opentelemetry crate are less used and - // might be more buggy, so better to stay on the well-beaten path.) - // - // We leak the runtime so that it keeps running after we exit the - // function. - let runtime = Box::leak(Box::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name("otlp runtime thread") - .worker_threads(1) - .build() - .unwrap(), - )); - let _guard = runtime.enter(); - - Some(init_tracing_internal( - service_name.to_string(), - export_config, - )) + tracing_opentelemetry::layer().with_tracer(p.tracer("global")) } -fn init_tracing_internal(service_name: String, export_config: ExportConfig) -> impl Layer -where - S: Subscriber + for<'span> LookupSpan<'span>, -{ +fn init_tracing_internal(service_name: String, export_config: ExportConfig) -> Provider { // Sets up exporter from the provided [`ExportConfig`] parameter. // If the endpoint is not specified, it is loaded from the // OTEL_EXPORTER_OTLP_ENDPOINT environment variable. @@ -153,22 +103,14 @@ where opentelemetry_sdk::propagation::TraceContextPropagator::new(), ); - let tracer = opentelemetry_sdk::trace::TracerProvider::builder() - .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) - .with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_NAME, - service_name, - )])) + Provider::builder() + .with_batch_exporter(exporter) + .with_resource( + opentelemetry_sdk::Resource::builder() + .with_service_name(service_name) + .build(), + ) .build() - .tracer("global"); - - tracing_opentelemetry::layer().with_tracer(tracer) -} - -// Shutdown trace pipeline gracefully, so that it has a chance to send any -// pending traces before we exit. -pub fn shutdown_tracing() { - opentelemetry::global::shutdown_tracer_provider(); } pub enum OtelEnablement { @@ -176,17 +118,17 @@ pub enum OtelEnablement { Enabled { service_name: String, export_config: ExportConfig, - runtime: &'static tokio::runtime::Runtime, }, } pub struct OtelGuard { + provider: Provider, pub dispatch: Dispatch, } impl Drop for OtelGuard { fn drop(&mut self) { - shutdown_tracing(); + _ = self.provider.shutdown(); } } @@ -199,22 +141,19 @@ impl Drop for OtelGuard { /// The lifetime of the guard should match taht of the application. On drop, it tears down the /// OTEL infra. pub fn init_performance_tracing(otel_enablement: OtelEnablement) -> Option { - let otel_subscriber = match otel_enablement { + match otel_enablement { OtelEnablement::Disabled => None, OtelEnablement::Enabled { service_name, export_config, - runtime, } => { - let otel_layer = runtime - .block_on(init_tracing(&service_name, export_config)) - .with_filter(LevelFilter::INFO); + let provider = init_tracing(&service_name, export_config)?; + + let otel_layer = layer(&provider).with_filter(LevelFilter::INFO); let otel_subscriber = tracing_subscriber::registry().with(otel_layer); - let otel_dispatch = Dispatch::new(otel_subscriber); + let dispatch = Dispatch::new(otel_subscriber); - Some(otel_dispatch) + Some(OtelGuard { dispatch, provider }) } - }; - - otel_subscriber.map(|dispatch| OtelGuard { dispatch }) + } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index dfb8b437c3..855af7009c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -126,7 +126,6 @@ fn main() -> anyhow::Result<()> { Some(cfg) => tracing_utils::OtelEnablement::Enabled { service_name: "pageserver".to_string(), export_config: (&cfg.export_config).into(), - runtime: *COMPUTE_REQUEST_RUNTIME, }, None => tracing_utils::OtelEnablement::Disabled, }; diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index 8da4cee4b9..96829bd6ea 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -42,6 +42,7 @@ pub trait StorageControllerUpcallApi { fn re_attach( &self, conf: &PageServerConf, + empty_local_disk: bool, ) -> impl Future< Output = Result, RetryForeverError>, > + Send; @@ -155,6 +156,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { async fn re_attach( &self, conf: &PageServerConf, + empty_local_disk: bool, ) -> Result, RetryForeverError> { let url = self .base_url @@ -226,6 +228,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { let request = ReAttachRequest { node_id: self.node_id, register: register.clone(), + empty_local_disk: Some(empty_local_disk), }; let response: ReAttachResponse = self diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 7854fd9e36..51581ccc2c 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -768,6 +768,7 @@ mod test { async fn re_attach( &self, _conf: &PageServerConf, + _empty_local_disk: bool, ) -> Result, RetryForeverError> { unimplemented!() } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 9b196ae393..b47bab16d8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -352,7 +352,8 @@ async fn init_load_generations( let client = StorageControllerUpcallClient::new(conf, cancel); info!("Calling {} API to re-attach tenants", client.base_url()); // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. - match client.re_attach(conf).await { + let empty_local_disk = tenant_confs.is_empty(); + match client.re_attach(conf, empty_local_disk).await { Ok(tenants) => tenants .into_iter() .flat_map(|(id, rart)| { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 371d335285..bdaed8fabf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -449,6 +449,7 @@ pub struct Timeline { /// A channel to send async requests to prepare a basebackup for the basebackup cache. basebackup_cache: Arc, + #[expect(dead_code)] feature_resolver: Arc, } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 206d5471e8..88b31a4a1a 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1326,13 +1326,7 @@ impl Timeline { .max() }; - let (partition_mode, partition_lsn) = if cfg!(test) - || cfg!(feature = "testing") - || self - .feature_resolver - .evaluate_boolean("image-compaction-boundary") - .is_ok() - { + let (partition_mode, partition_lsn) = { let last_repartition_lsn = self.partitioning.read().1; let lsn = match l0_l1_boundary_lsn { Some(boundary) => gc_cutoff @@ -1348,8 +1342,6 @@ impl Timeline { } else { ("l0_l1_boundary", lsn) } - } else { - ("latest_record", self.get_last_record_lsn()) }; // 2. Repartition and create image layers if necessary diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index bf7aeb4108..34cabaca62 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -5,6 +5,7 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ communicator.o \ + communicator_process.o \ extension_server.o \ file_cache.o \ hll.o \ @@ -29,6 +30,11 @@ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) SHLIB_LINK = -lcurl +UNAME_S := $(shell uname -s) +ifeq ($(UNAME_S), Darwin) + SHLIB_LINK += -framework Security -framework CoreFoundation -framework SystemConfiguration +endif + EXTENSION = neon DATA = \ neon--1.0.sql \ @@ -57,7 +63,8 @@ WALPROP_OBJS = \ # libcommunicator.a is built by cargo from the Rust sources under communicator/ # subdirectory. `cargo build` also generates communicator_bindings.h. -neon.o: communicator/communicator_bindings.h +communicator_process.o: communicator/communicator_bindings.h +file_cache.o: communicator/communicator_bindings.h $(NEON_CARGO_ARTIFACT_TARGET_DIR)/libcommunicator.a communicator/communicator_bindings.h &: (cd $(srcdir)/communicator && cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE)) diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 158b8940a3..5a08b3e331 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -1820,12 +1820,12 @@ nm_to_string(NeonMessage *msg) } case T_NeonGetPageResponse: { -#if 0 NeonGetPageResponse *msg_resp = (NeonGetPageResponse *) msg; -#endif appendStringInfoString(&s, "{\"type\": \"NeonGetPageResponse\""); - appendStringInfo(&s, ", \"page\": \"XXX\"}"); + appendStringInfo(&s, ", \"rinfo\": %u/%u/%u", RelFileInfoFmt(msg_resp->req.rinfo)); + appendStringInfo(&s, ", \"forknum\": %d", msg_resp->req.forknum); + appendStringInfo(&s, ", \"blkno\": %u", msg_resp->req.blkno); appendStringInfoChar(&s, '}'); break; } diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index b5ce389297..71cb5c7ae9 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -16,7 +16,14 @@ testing = [] rest_broker = [] [dependencies] -neon-shmem.workspace = true +axum.workspace = true +http.workspace = true +tokio = { workspace = true, features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] } +tracing.workspace = true +tracing-subscriber.workspace = true + +measured.workspace = true +utils.workspace = true workspace_hack = { version = "0.1", path = "../../../workspace_hack" } [build-dependencies] diff --git a/pgxn/neon/communicator/README.md b/pgxn/neon/communicator/README.md index 8169ae72b5..7ff4708171 100644 --- a/pgxn/neon/communicator/README.md +++ b/pgxn/neon/communicator/README.md @@ -1,7 +1,22 @@ -This package will evolve into a "compute-pageserver communicator" -process and machinery. For now, it's just a dummy that doesn't do -anything interesting, but it allows us to test the compilation and -linking of Rust code into the Postgres extensions. +# Communicator + +This package provides the so-called "compute-pageserver communicator", +or just "communicator" in short. The communicator is a separate +background worker process that runs in the PostgreSQL server. It's +part of the neon extension. Currently, it only provides an HTTP +endpoint for metrics, but in the future it will evolve to handle all +communications with the pageservers. + +## Source code view + +pgxn/neon/communicator_process.c + Contains code needed to start up the communicator process, and + the glue that interacts with PostgreSQL code and the Rust + code in the communicator process. + + +pgxn/neon/communicator/src/worker_process/ + Worker process main loop and glue code At compilation time, pgxn/neon/communicator/ produces a static library, libcommunicator.a. It is linked to the neon.so extension diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs index 24c180d37d..9a3a46c95f 100644 --- a/pgxn/neon/communicator/src/lib.rs +++ b/pgxn/neon/communicator/src/lib.rs @@ -1,6 +1,5 @@ -/// dummy function, just to test linking Rust functions into the C -/// extension -#[unsafe(no_mangle)] -pub extern "C" fn communicator_dummy(arg: u32) -> u32 { - arg + 1 -} +mod worker_process; + +/// Name of the Unix Domain Socket that serves the metrics, and other APIs in the +/// future. This is within the Postgres data directory. +const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket"; diff --git a/pgxn/neon/communicator/src/worker_process/callbacks.rs b/pgxn/neon/communicator/src/worker_process/callbacks.rs new file mode 100644 index 0000000000..70e8e12fea --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/callbacks.rs @@ -0,0 +1,51 @@ +//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These +//! are implemented in `neon/pgxn/communicator_process.c`. The function signatures better +//! match! +//! +//! These are called from the communicator threads! Careful what you do, most Postgres +//! functions are not safe to call in that context. + +#[cfg(not(test))] +unsafe extern "C" { + pub fn callback_set_my_latch_unsafe(); + pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics; +} + +// Compile unit tests with dummy versions of the functions. Unit tests cannot call back +// into the C code. (As of this writing, no unit tests even exists in the communicator +// package, but the code coverage build still builds these and tries to link with the +// external C code.) +#[cfg(test)] +unsafe fn callback_set_my_latch_unsafe() { + panic!("not usable in unit tests"); +} +#[cfg(test)] +unsafe fn callback_get_lfc_metrics_unsafe() -> LfcMetrics { + panic!("not usable in unit tests"); +} + +// safe wrappers + +pub(super) fn callback_set_my_latch() { + unsafe { callback_set_my_latch_unsafe() }; +} + +pub(super) fn callback_get_lfc_metrics() -> LfcMetrics { + unsafe { callback_get_lfc_metrics_unsafe() } +} + +/// Return type of the callback_get_lfc_metrics() function. +#[repr(C)] +pub struct LfcMetrics { + pub lfc_cache_size_limit: i64, + pub lfc_hits: i64, + pub lfc_misses: i64, + pub lfc_used: i64, + pub lfc_writes: i64, + + // working set size looking back 1..60 minutes. + // + // Index 0 is the size of the working set accessed within last 1 minute, + // index 59 is the size of the working set accessed within last 60 minutes. + pub lfc_approximate_working_set_size_windows: [i64; 60], +} diff --git a/pgxn/neon/communicator/src/worker_process/control_socket.rs b/pgxn/neon/communicator/src/worker_process/control_socket.rs new file mode 100644 index 0000000000..ef9d1f1529 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/control_socket.rs @@ -0,0 +1,102 @@ +//! Communicator control socket. +//! +//! Currently, the control socket is used to provide information about the communicator +//! process, file cache etc. as prometheus metrics. In the future, it can be used to +//! expose more things. +//! +//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres +//! data directory. For debugging, you can access it with curl: +//! +//! ```sh +//! curl --unix-socket neon-communicator.socket http://localhost/metrics +//! ``` +//! +use axum::Router; +use axum::body::Body; +use axum::extract::State; +use axum::response::Response; +use http::StatusCode; +use http::header::CONTENT_TYPE; + +use measured::MetricGroup; +use measured::text::BufferedTextEncoder; + +use std::io::ErrorKind; + +use tokio::net::UnixListener; + +use crate::NEON_COMMUNICATOR_SOCKET_NAME; +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +impl CommunicatorWorkerProcessStruct { + /// Launch the listener + pub(crate) async fn launch_control_socket_listener( + &'static self, + ) -> Result<(), std::io::Error> { + use axum::routing::get; + let app = Router::new() + .route("/metrics", get(get_metrics)) + .route("/autoscaling_metrics", get(get_autoscaling_metrics)) + .route("/debug/panic", get(handle_debug_panic)) + .with_state(self); + + // If the server is restarted, there might be an old socket still + // lying around. Remove it first. + match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) { + Ok(()) => { + tracing::warn!("removed stale control socket"); + } + Err(e) if e.kind() == ErrorKind::NotFound => {} + Err(e) => { + tracing::error!("could not remove stale control socket: {e:#}"); + // Try to proceed anyway. It will likely fail below though. + } + }; + + // Create the unix domain socket and start listening on it + let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?; + + tokio::spawn(async { + tracing::info!("control socket listener spawned"); + axum::serve(listener, app) + .await + .expect("axum::serve never returns") + }); + + Ok(()) + } +} + +/// Expose all Prometheus metrics. +async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response { + tracing::trace!("/metrics requested"); + metrics_to_response(&state).await +} + +/// Expose Prometheus metrics, for use by the autoscaling agent. +/// +/// This is a subset of all the metrics. +async fn get_autoscaling_metrics( + State(state): State<&CommunicatorWorkerProcessStruct>, +) -> Response { + tracing::trace!("/metrics requested"); + metrics_to_response(&state.lfc_metrics).await +} + +async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response { + panic!("test HTTP handler task panic"); +} + +/// Helper function to convert prometheus metrics to a text response +async fn metrics_to_response(metrics: &(dyn MetricGroup + Sync)) -> Response { + let mut enc = BufferedTextEncoder::new(); + metrics + .collect_group_into(&mut enc) + .unwrap_or_else(|never| match never {}); + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(enc.finish())) + .unwrap() +} diff --git a/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs new file mode 100644 index 0000000000..fcb291c71b --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs @@ -0,0 +1,83 @@ +use measured::{ + FixedCardinalityLabel, Gauge, GaugeVec, LabelGroup, MetricGroup, + label::{LabelName, LabelValue, StaticLabelSet}, + metric::{MetricEncoding, gauge::GaugeState, group::Encoding}, +}; + +use super::callbacks::callback_get_lfc_metrics; + +pub(crate) struct LfcMetricsCollector; + +#[derive(MetricGroup)] +#[metric(new())] +struct LfcMetricsGroup { + /// LFC cache size limit in bytes + lfc_cache_size_limit: Gauge, + /// LFC cache hits + lfc_hits: Gauge, + /// LFC cache misses + lfc_misses: Gauge, + /// LFC chunks used (chunk = 1MB) + lfc_used: Gauge, + /// LFC cache writes + lfc_writes: Gauge, + /// Approximate working set size in pages of 8192 bytes + #[metric(init = GaugeVec::dense())] + lfc_approximate_working_set_size_windows: GaugeVec>, +} + +impl MetricGroup for LfcMetricsCollector +where + GaugeState: MetricEncoding, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), ::Err> { + let g = LfcMetricsGroup::new(); + + let lfc_metrics = callback_get_lfc_metrics(); + + g.lfc_cache_size_limit.set(lfc_metrics.lfc_cache_size_limit); + g.lfc_hits.set(lfc_metrics.lfc_hits); + g.lfc_misses.set(lfc_metrics.lfc_misses); + g.lfc_used.set(lfc_metrics.lfc_used); + g.lfc_writes.set(lfc_metrics.lfc_writes); + + for i in 0..60 { + let val = lfc_metrics.lfc_approximate_working_set_size_windows[i]; + g.lfc_approximate_working_set_size_windows + .set(MinuteAsSeconds(i), val); + } + + g.collect_group_into(enc) + } +} + +/// This stores the values in range 0..60, +/// encodes them as seconds (60, 120, 180, ..., 3600) +#[derive(Clone, Copy)] +struct MinuteAsSeconds(usize); + +impl FixedCardinalityLabel for MinuteAsSeconds { + fn cardinality() -> usize { + 60 + } + + fn encode(&self) -> usize { + self.0 + } + + fn decode(value: usize) -> Self { + Self(value) + } +} + +impl LabelValue for MinuteAsSeconds { + fn visit(&self, v: V) -> V::Output { + v.write_int((self.0 + 1) as i64 * 60) + } +} + +impl LabelGroup for MinuteAsSeconds { + fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) { + v.write_value(LabelName::from_str("duration_seconds"), self); + } +} diff --git a/pgxn/neon/communicator/src/worker_process/logging.rs b/pgxn/neon/communicator/src/worker_process/logging.rs new file mode 100644 index 0000000000..1ae31cd0dd --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/logging.rs @@ -0,0 +1,250 @@ +//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log +//! +//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres +//! process latch is raised. That wakes up the loop in the main thread, see +//! `communicator_new_bgworker_main()`. It reads the message from the channel and +//! ereport()s it. This ensures that only one thread, the main thread, calls the +//! PostgreSQL logging routines at any time. + +use std::ffi::c_char; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::mpsc::{TryRecvError, TrySendError}; + +use tracing::info; +use tracing::{Event, Level, Metadata, Subscriber}; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter}; +use tracing_subscriber::registry::LookupSpan; + +use crate::worker_process::callbacks::callback_set_my_latch; + +/// This handle is passed to the C code, and used by [`communicator_worker_poll_logging`] +pub struct LoggingReceiver { + receiver: Receiver, +} + +/// This is passed to `tracing` +struct LoggingSender { + sender: SyncSender, +} + +static DROPPED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0); + +/// Called once, at worker process startup. The returned LoggingState is passed back +/// in the subsequent calls to `pump_logging`. It is opaque to the C code. +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_configure_logging() -> Box { + let (sender, receiver) = sync_channel(1000); + + let receiver = LoggingReceiver { receiver }; + let sender = LoggingSender { sender }; + + use tracing_subscriber::prelude::*; + let r = tracing_subscriber::registry(); + + let r = r.with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .event_format(SimpleFormatter) + .with_writer(sender) + // TODO: derive this from log_min_messages? Currently the code in + // communicator_process.c forces log_min_messages='INFO'. + .with_filter(LevelFilter::from_level(Level::INFO)), + ); + r.init(); + + info!("communicator process logging started"); + + Box::new(receiver) +} + +/// Read one message from the logging queue. This is essentially a wrapper to Receiver, +/// with a C-friendly signature. +/// +/// The message is copied into *errbuf, which is a caller-supplied buffer of size +/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always +/// NULL-terminated. +/// +/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see +/// elog.h +/// +/// If there was a message, *dropped_event_count_p is also updated with a counter of how +/// many log messages in total has been dropped. By comparing that with the value from +/// previous call, you can tell how many were dropped since last call. +/// +/// Returns: +/// +/// 0 if there were no messages +/// 1 if there was a message. The message and its level are returned in +/// *errbuf and *elevel_p. *dropped_event_count_p is also updated. +/// -1 on error, i.e the other end of the queue was disconnected +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_poll_logging( + state: &mut LoggingReceiver, + errbuf: *mut c_char, + errbuf_len: u32, + elevel_p: &mut i32, + dropped_event_count_p: &mut u64, +) -> i32 { + let msg = match state.receiver.try_recv() { + Err(TryRecvError::Empty) => return 0, + Err(TryRecvError::Disconnected) => return -1, + Ok(msg) => msg, + }; + + let src: &[u8] = &msg.message; + let dst: *mut u8 = errbuf.cast(); + let len = std::cmp::min(src.len(), errbuf_len as usize - 1); + unsafe { + std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len); + *(dst.add(len)) = b'\0'; // NULL terminator + } + + // Map the tracing Level to PostgreSQL elevel. + // + // XXX: These levels are copied from PostgreSQL's elog.h. Introduce another enum to + // hide these? + *elevel_p = match msg.level { + Level::TRACE => 10, // DEBUG5 + Level::DEBUG => 14, // DEBUG1 + Level::INFO => 17, // INFO + Level::WARN => 19, // WARNING + Level::ERROR => 21, // ERROR + }; + + *dropped_event_count_p = DROPPED_EVENT_COUNT.load(Ordering::Relaxed); + + 1 +} + +//---- The following functions can be called from any thread ---- + +#[derive(Clone)] +struct FormattedEventWithMeta { + message: Vec, + level: tracing::Level, +} + +impl Default for FormattedEventWithMeta { + fn default() -> Self { + FormattedEventWithMeta { + message: Vec::new(), + level: tracing::Level::DEBUG, + } + } +} + +struct EventBuilder<'a> { + event: FormattedEventWithMeta, + + sender: &'a LoggingSender, +} + +impl std::io::Write for EventBuilder<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.event.message.write(buf) + } + fn flush(&mut self) -> std::io::Result<()> { + self.sender.send_event(self.event.clone()); + Ok(()) + } +} + +impl Drop for EventBuilder<'_> { + fn drop(&mut self) { + let sender = self.sender; + let event = std::mem::take(&mut self.event); + + sender.send_event(event); + } +} + +impl<'a> MakeWriter<'a> for LoggingSender { + type Writer = EventBuilder<'a>; + + fn make_writer(&'a self) -> Self::Writer { + panic!("not expected to be called when make_writer_for is implemented"); + } + + fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer { + EventBuilder { + event: FormattedEventWithMeta { + message: Vec::new(), + level: *meta.level(), + }, + sender: self, + } + } +} + +impl LoggingSender { + fn send_event(&self, e: FormattedEventWithMeta) { + match self.sender.try_send(e) { + Ok(()) => { + // notify the main thread + callback_set_my_latch(); + } + Err(TrySendError::Disconnected(_)) => {} + Err(TrySendError::Full(_)) => { + // The queue is full, cannot send any more. To avoid blocking the tokio + // thread, simply drop the message. Better to lose some logs than get + // stuck if there's a problem with the logging. + // + // Record the fact that was a message was dropped by incrementing the + // counter. + DROPPED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed); + } + } + } +} + +/// Simple formatter implementation for tracing_subscriber, which prints the log spans and +/// message part like the default formatter, but no timestamp or error level. The error +/// level is captured separately by `FormattedEventWithMeta', and when the error is +/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that +/// point. (The timestamp printed will therefore lag behind the timestamp on the event +/// here, if the main thread doesn't process the log message promptly) +struct SimpleFormatter; + +impl FormatEvent for SimpleFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + // Format all the spans in the event's span context. + if let Some(scope) = ctx.event_scope() { + for span in scope.from_root() { + write!(writer, "{}", span.name())?; + + // `FormattedFields` is a formatted representation of the span's fields, + // which is stored in its extensions by the `fmt` layer's `new_span` + // method. The fields will have been formatted by the same field formatter + // that's provided to the event formatter in the `FmtContext`. + let ext = span.extensions(); + let fields = &ext + .get::>() + .expect("will never be `None`"); + + // Skip formatting the fields if the span had no fields. + if !fields.is_empty() { + write!(writer, "{{{fields}}}")?; + } + write!(writer, ": ")?; + } + } + + // Write fields on the event + ctx.field_format().format_fields(writer.by_ref(), event)?; + + Ok(()) + } +} diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs new file mode 100644 index 0000000000..3147a3de63 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -0,0 +1,66 @@ +use std::str::FromStr as _; + +use crate::worker_process::lfc_metrics::LfcMetricsCollector; + +use measured::MetricGroup; +use measured::metric::MetricEncoding; +use measured::metric::gauge::GaugeState; +use measured::metric::group::Encoding; +use utils::id::{TenantId, TimelineId}; + +pub struct CommunicatorWorkerProcessStruct { + runtime: tokio::runtime::Runtime, + + /*** Metrics ***/ + pub(crate) lfc_metrics: LfcMetricsCollector, +} + +/// Launch the communicator process's Rust subsystems +pub(super) fn init( + tenant_id: Option<&str>, + timeline_id: Option<&str>, +) -> Result<&'static CommunicatorWorkerProcessStruct, String> { + // The caller validated these already + let _tenant_id = tenant_id + .map(TenantId::from_str) + .transpose() + .map_err(|e| format!("invalid tenant ID: {e}"))?; + let _timeline_id = timeline_id + .map(TimelineId::from_str) + .transpose() + .map_err(|e| format!("invalid timeline ID: {e}"))?; + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("communicator thread") + .build() + .unwrap(); + + let worker_struct = CommunicatorWorkerProcessStruct { + // Note: it's important to not drop the runtime, or all the tasks are dropped + // too. Including it in the returned struct is one way to keep it around. + runtime, + + // metrics + lfc_metrics: LfcMetricsCollector, + }; + let worker_struct = Box::leak(Box::new(worker_struct)); + + // Start the listener on the control socket + worker_struct + .runtime + .block_on(worker_struct.launch_control_socket_listener()) + .map_err(|e| e.to_string())?; + + Ok(worker_struct) +} + +impl MetricGroup for CommunicatorWorkerProcessStruct +where + T: Encoding, + GaugeState: MetricEncoding, +{ + fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> { + self.lfc_metrics.collect_group_into(enc) + } +} diff --git a/pgxn/neon/communicator/src/worker_process/mod.rs b/pgxn/neon/communicator/src/worker_process/mod.rs new file mode 100644 index 0000000000..3602686779 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/mod.rs @@ -0,0 +1,13 @@ +//! This code runs in the communicator worker process. This provides +//! the glue code to: +//! +//! - launch the main loop, +//! - receive IO requests from backends and process them, +//! - write results back to backends. + +mod callbacks; +mod control_socket; +mod lfc_metrics; +mod logging; +mod main_loop; +mod worker_interface; diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs new file mode 100644 index 0000000000..1dfd6820d3 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -0,0 +1,60 @@ +//! Functions called from the C code in the worker process + +use std::ffi::{CStr, CString, c_char}; + +use crate::worker_process::main_loop; +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +/// Launch the communicator's tokio tasks, which do most of the work. +/// +/// The caller has initialized the process as a regular PostgreSQL background worker +/// process. +/// +/// Inputs: +/// `tenant_id` and `timeline_id` can be NULL, if we're been launched in "non-Neon" mode, +/// where we use local storage instead of connecting to remote neon storage. That's +/// currently only used in some unit tests. +/// +/// Result: +/// Returns pointer to CommunicatorWorkerProcessStruct, which is a handle to running +/// Rust tasks. The C code can use it to interact with the Rust parts. On failure, returns +/// None/NULL, and an error message is returned in *error_p +/// +/// This is called only once in the process, so the returned struct, and error message in +/// case of failure, are simply leaked. +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_launch( + tenant_id: *const c_char, + timeline_id: *const c_char, + error_p: *mut *const c_char, +) -> Option<&'static CommunicatorWorkerProcessStruct> { + // Convert the arguments into more convenient Rust types + let tenant_id = if tenant_id.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(tenant_id) }; + Some(cstr.to_str().expect("assume UTF-8")) + }; + let timeline_id = if timeline_id.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(timeline_id) }; + Some(cstr.to_str().expect("assume UTF-8")) + }; + + // The `init` function does all the work. + let result = main_loop::init(tenant_id, timeline_id); + + // On failure, return the error message to the C caller in *error_p. + match result { + Ok(worker_struct) => Some(worker_struct), + Err(errmsg) => { + let errmsg = CString::new(errmsg).expect("no nuls within error message"); + let errmsg = Box::leak(errmsg.into_boxed_c_str()); + let p: *const c_char = errmsg.as_ptr(); + + unsafe { *error_p = p }; + None + } + } +} diff --git a/pgxn/neon/communicator_process.c b/pgxn/neon/communicator_process.c new file mode 100644 index 0000000000..fc734ce85b --- /dev/null +++ b/pgxn/neon/communicator_process.c @@ -0,0 +1,273 @@ +/*------------------------------------------------------------------------- + * + * communicator_process.c + * Functions for starting up the communicator background worker process. + * + * Currently, the communicator process only functions as a metrics + * exporter. It provides an HTTP endpoint for polling a limited set of + * metrics. TODO: In the future, it will do much more, i.e. handle all + * the communications with the pageservers. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" +#include "replication/walsender.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/pmsignal.h" +#include "storage/procsignal.h" +#include "tcop/tcopprot.h" +#include "utils/timestamp.h" + +#include "communicator_process.h" +#include "file_cache.h" +#include "neon.h" +#include "neon_perf_counters.h" + +/* the rust bindings, generated by cbindgen */ +#include "communicator/communicator_bindings.h" + +static void pump_logging(struct LoggingReceiver *logging); +PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg); + +/**** Initialization functions. These run in postmaster ****/ + +void +pg_init_communicator_process(void) +{ + BackgroundWorker bgw; + + /* Initialize the background worker process */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_PostmasterStart; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/**** Worker process functions. These run in the communicator worker process ****/ + +/* + * Entry point for the communicator bgworker process + */ +void +communicator_new_bgworker_main(Datum main_arg) +{ + struct LoggingReceiver *logging; + const char *errmsg = NULL; + const struct CommunicatorWorkerProcessStruct *proc_handle; + + /* + * Pretend that this process is a WAL sender. That affects the shutdown + * sequence: WAL senders are shut down last, after the final checkpoint + * has been written. That's what we want for the communicator process too. + */ + am_walsender = true; + MarkPostmasterChildWalSender(); + + /* Establish signal handlers. */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + /* + * Postmaster sends us SIGUSR2 when all regular backends and bgworkers + * have exited, and it's time for us to exit too + */ + pqsignal(SIGUSR2, die); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + + BackgroundWorkerUnblockSignals(); + + /* + * By default, INFO messages are not printed to the log. We want + * `tracing::info!` messages emitted from the communicator to be printed, + * however, so increase the log level. + * + * XXX: This overrides any user-set value from the config file. That's not + * great, but on the other hand, there should be little reason for user to + * control the verbosity of the communicator. It's not too verbose by + * default. + */ + SetConfigOption("log_min_messages", "INFO", PGC_SUSET, PGC_S_OVERRIDE); + + logging = communicator_worker_configure_logging(); + + proc_handle = communicator_worker_launch( + neon_tenant[0] == '\0' ? NULL : neon_tenant, + neon_timeline[0] == '\0' ? NULL : neon_timeline, + &errmsg + ); + if (proc_handle == NULL) + { + /* + * Something went wrong. Before exiting, forward any log messages that + * might've been generated during the failed launch. + */ + pump_logging(logging); + + elog(PANIC, "%s", errmsg); + } + + /* + * The Rust tokio runtime has been launched, and it's running in the + * background now. This loop in the main thread handles any interactions + * we need with the rest of PostgreSQL. + * + * NB: This process is now multi-threaded! The Rust threads do not call + * into any Postgres functions, but it's not entirely clear which Postgres + * functions are safe to call from this main thread either. Be very + * careful about adding anything non-trivial here. + * + * Also note that we try to react quickly to any log messages arriving + * from the Rust thread. Be careful to not do anything too expensive here + * that might cause delays. + */ + elog(LOG, "communicator threads started"); + for (;;) + { + TimestampTz before; + long duration; + + ResetLatch(MyLatch); + + /* + * Forward any log messages from the Rust threads into the normal + * Postgres logging facility. + */ + pump_logging(logging); + + /* + * Check interrupts like system shutdown or config reload + * + * We mustn't block for too long within this loop, or we risk the log + * queue to fill up and messages to be lost. Also, even if we can keep + * up, if there's a long delay between sending a message and printing + * it to the log, the timestamps on the messages get skewed, which is + * confusing. + * + * We expect processing interrupts to happen fast enough that it's OK, + * but measure it just in case, and print a warning if it takes longer + * than 100 ms. + */ +#define LOG_SKEW_WARNING_MS 100 + before = GetCurrentTimestamp(); + + CHECK_FOR_INTERRUPTS(); + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp()); + if (duration > LOG_SKEW_WARNING_MS) + elog(WARNING, "handling interrupts took %ld ms, communicator log timestamps might be skewed", duration); + + /* + * Wait until we are woken up. The rust threads will set the latch + * when there's a log message to forward. + */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, + 0, + PG_WAIT_EXTENSION); + } +} + +static void +pump_logging(struct LoggingReceiver *logging) +{ + char errbuf[1000]; + int elevel; + int32 rc; + static uint64_t last_dropped_event_count = 0; + uint64_t dropped_event_count; + uint64_t dropped_now; + + for (;;) + { + rc = communicator_worker_poll_logging(logging, + errbuf, + sizeof(errbuf), + &elevel, + &dropped_event_count); + if (rc == 0) + { + /* nothing to do */ + break; + } + else if (rc == 1) + { + /* Because we don't want to exit on error */ + + if (message_level_is_interesting(elevel)) + { + /* + * Prevent interrupts while cleaning up. + * + * (Not sure if this is required, but all the error handlers + * in Postgres that are installed as sigsetjmp() targets do + * this, so let's follow the example) + */ + HOLD_INTERRUPTS(); + + errstart(elevel, TEXTDOMAIN); + errmsg_internal("[COMMUNICATOR] %s", errbuf); + EmitErrorReport(); + FlushErrorState(); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + } + } + else if (rc == -1) + { + elog(ERROR, "logging channel was closed unexpectedly"); + } + } + + /* + * If the queue was full at any time since the last time we reported it, + * report how many messages were lost. We do this outside the loop, so + * that if the logging system is clogged, we don't exacerbate it by + * printing lots of warnings about dropped messages. + */ + dropped_now = dropped_event_count - last_dropped_event_count; + if (dropped_now != 0) + { + elog(WARNING, "%lu communicator log messages were dropped because the log buffer was full", + (unsigned long) dropped_now); + last_dropped_event_count = dropped_event_count; + } +} + +/**** + * Callbacks from the rust code, in the communicator process. + * + * NOTE: These must be thread-safe! It's very limited which PostgreSQL + * functions you can use!!! + * + * The signatures of these need to match those in the Rust code. + */ + +void +callback_set_my_latch_unsafe(void) +{ + SetLatch(MyLatch); +} diff --git a/pgxn/neon/communicator_process.h b/pgxn/neon/communicator_process.h new file mode 100644 index 0000000000..95afc70153 --- /dev/null +++ b/pgxn/neon/communicator_process.h @@ -0,0 +1,17 @@ +/*------------------------------------------------------------------------- + * + * communicator_process.h + * Communicator process + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef COMMUNICATOR_PROCESS_H +#define COMMUNICATOR_PROCESS_H + +extern void pg_init_communicator_process(void); + +#endif /* COMMUNICATOR_PROCESS_H */ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 7cfa769959..4da6c176cd 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -52,6 +52,8 @@ #include "pagestore_client.h" #include "communicator.h" +#include "communicator/communicator_bindings.h" + #define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) /* @@ -2156,6 +2158,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset) return dc; } +/* + * Get metrics, for the built-in metrics exporter that's part of the communicator + * process. + * + * NB: This is called from a Rust tokio task inside the communicator process. + * Acquiring lwlocks, elog(), allocating memory or anything else non-trivial + * is strictly prohibited here! + */ +struct LfcMetrics +callback_get_lfc_metrics_unsafe(void) +{ + struct LfcMetrics result = { + .lfc_cache_size_limit = (int64) lfc_size_limit * 1024 * 1024, + .lfc_hits = lfc_ctl ? lfc_ctl->hits : 0, + .lfc_misses = lfc_ctl ? lfc_ctl->misses : 0, + .lfc_used = lfc_ctl ? lfc_ctl->used : 0, + .lfc_writes = lfc_ctl ? lfc_ctl->writes : 0, + }; + + if (lfc_ctl) + { + for (int minutes = 1; minutes <= 60; minutes++) + { + result.lfc_approximate_working_set_size_windows[minutes - 1] = + lfc_approximate_working_set_size_seconds(minutes * 60, false); + } + } + + return result; +} + + PG_FUNCTION_INFO_V1(get_local_cache_state); Datum diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 4e4320e498..5b9c7d600c 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -31,6 +31,7 @@ #include "utils/guc_tables.h" #include "communicator.h" +#include "communicator_process.h" #include "extension_server.h" #include "file_cache.h" #include "neon.h" @@ -44,9 +45,6 @@ #include "storage/ipc.h" #endif -/* the rust bindings, generated by cbindgen */ -#include "communicator/communicator_bindings.h" - PG_MODULE_MAGIC; void _PG_init(void); @@ -457,9 +455,6 @@ _PG_init(void) load_file("$libdir/neon_rmgr", false); #endif - /* dummy call to a Rust function in the communicator library, to check that it works */ - (void) communicator_dummy(123); - /* * Initializing a pre-loaded Postgres extension happens in three stages: * @@ -497,6 +492,8 @@ _PG_init(void) pg_init_walproposer(); init_lwlsncache(); + pg_init_communicator_process(); + pg_init_communicator(); Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines; diff --git a/poetry.lock b/poetry.lock index b2072bf1bc..a920833fbf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -3068,6 +3068,21 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-unixsocket" +version = "0.4.1" +description = "Use requests to talk HTTP via a UNIX domain socket" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "requests_unixsocket-0.4.1-py3-none-any.whl", hash = "sha256:60c4942e9dbecc2f64d611039fb1dfc25da382083c6434ac0316dca3ff908f4d"}, + {file = "requests_unixsocket-0.4.1.tar.gz", hash = "sha256:b2596158c356ecee68d27ba469a52211230ac6fb0cde8b66afb19f0ed47a1995"}, +] + +[package.dependencies] +requests = ">=1.1" + [[package]] name = "responses" version = "0.25.3" @@ -3844,4 +3859,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "6a1e8ba06b8194bf28d87fd5e184e2ddc2b4a19dffcbe3953b26da3d55c9212f" +content-hash = "b08aba407631b0341d2ef8bf9acffd733bfc7d32b12d344717ab4c7fef697625" diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 4ac8b6a995..f3782312dc 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -76,7 +76,7 @@ fn cli() -> clap::Command { } pub async fn run() -> anyhow::Result<()> { - let _logging_guard = crate::logging::init().await?; + let _logging_guard = crate::logging::init()?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 255f6fbbee..4148f4bc62 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -334,7 +334,7 @@ struct PgSniRouterArgs { } pub async fn run() -> anyhow::Result<()> { - let _logging_guard = crate::logging::init().await?; + let _logging_guard = crate::logging::init()?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index d4fd826c13..0abb500608 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -26,7 +26,7 @@ use crate::metrics::Metrics; /// configuration from environment variables. For example, to change the /// destination, set `OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318`. /// See -pub async fn init() -> anyhow::Result { +pub fn init() -> anyhow::Result { let logfmt = LogFormat::from_env()?; let env_filter = EnvFilter::builder() @@ -43,8 +43,8 @@ pub async fn init() -> anyhow::Result { .expect("this should be a valid filter directive"), ); - let otlp_layer = - tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()).await; + let provider = tracing_utils::init_tracing("proxy", tracing_utils::ExportConfig::default()); + let otlp_layer = provider.as_ref().map(tracing_utils::layer); let json_log_layer = if logfmt == LogFormat::Json { Some(JsonLoggingLayer::new( @@ -76,7 +76,7 @@ pub async fn init() -> anyhow::Result { .with(text_log_layer) .try_init()?; - Ok(LoggingGuard) + Ok(LoggingGuard(provider)) } /// Initialize logging for local_proxy with log prefix and no opentelemetry. @@ -97,7 +97,7 @@ pub fn init_local_proxy() -> anyhow::Result { .with(fmt_layer) .try_init()?; - Ok(LoggingGuard) + Ok(LoggingGuard(None)) } pub struct LocalProxyFormatter(Format); @@ -118,14 +118,16 @@ where } } -pub struct LoggingGuard; +pub struct LoggingGuard(Option); impl Drop for LoggingGuard { fn drop(&mut self) { - // Shutdown trace pipeline gracefully, so that it has a chance to send any - // pending traces before we exit. - tracing::info!("shutting down the tracing machinery"); - tracing_utils::shutdown_tracing(); + if let Some(p) = &self.0 { + // Shutdown trace pipeline gracefully, so that it has a chance to send any + // pending traces before we exit. + tracing::info!("shutting down the tracing machinery"); + drop(p.shutdown()); + } } } diff --git a/pyproject.toml b/pyproject.toml index e992e81fe7..7631a05942 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ types-pyyaml = "^6.0.12.20240917" testcontainers = "^4.9.0" # Install a release candidate of `jsonnet`, as it supports Python 3.13 jsonnet = "^0.21.0-rc2" +requests-unixsocket = "^0.4.1" [tool.poetry.group.dev.dependencies] mypy = "==1.13.0" diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index a1a0aab9fd..b8774b30ea 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -427,6 +427,9 @@ impl From for ApiError { TimelineError::NotFound(ttid) => { ApiError::NotFound(anyhow!("timeline {} not found", ttid).into()) } + TimelineError::Deleted(ttid) => { + ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into()) + } _ => ApiError::InternalServerError(anyhow!("{}", te)), } } diff --git a/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql new file mode 100644 index 0000000000..27d6048cd3 --- /dev/null +++ b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/down.sql @@ -0,0 +1 @@ +ALTER TABLE timelines DROP sk_set_notified_generation; diff --git a/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql new file mode 100644 index 0000000000..50178ab6a3 --- /dev/null +++ b/storage_controller/migrations/2025-07-08-114340_sk_set_notified_generation/up.sql @@ -0,0 +1 @@ +ALTER TABLE timelines ADD sk_set_notified_generation INTEGER NOT NULL DEFAULT 1; diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 5d21feeb10..34d4ac6fba 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -225,6 +225,10 @@ struct Cli { #[arg(long)] shard_split_request_timeout: Option, + + /// **Feature Flag** Whether the storage controller should act to rectify pageserver-reported local disk loss. + #[arg(long, default_value = "false")] + handle_ps_local_disk_loss: bool, } enum StrictMode { @@ -477,6 +481,7 @@ async fn async_main() -> anyhow::Result<()> { .shard_split_request_timeout .map(humantime::Duration::into) .unwrap_or(Duration::MAX), + handle_ps_local_disk_loss: args.handle_ps_local_disk_loss, }; // Validate that we can connect to the database diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 2e3f8c6908..619b5f69b8 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -131,6 +131,8 @@ pub(crate) enum DatabaseOperation { InsertTimeline, UpdateTimeline, UpdateTimelineMembership, + UpdateCplaneNotifiedGeneration, + UpdateSkSetNotifiedGeneration, GetTimeline, InsertTimelineReconcile, RemoveTimelineReconcile, @@ -1497,6 +1499,8 @@ impl Persistence { /// Update timeline membership configuration in the database. /// Perform a compare-and-swap (CAS) operation on the timeline's generation. /// The `new_generation` must be the next (+1) generation after the one in the database. + /// Also inserts reconcile_requests to safekeeper_timeline_pending_ops table in the same + /// transaction. pub(crate) async fn update_timeline_membership( &self, tenant_id: TenantId, @@ -1504,8 +1508,11 @@ impl Persistence { new_generation: SafekeeperGeneration, sk_set: &[NodeId], new_sk_set: Option<&[NodeId]>, + reconcile_requests: &[TimelinePendingOpPersistence], ) -> DatabaseResult<()> { - use crate::schema::timelines::dsl; + use crate::schema::safekeeper_timeline_pending_ops as stpo; + use crate::schema::timelines; + use diesel::query_dsl::methods::FilterDsl; let prev_generation = new_generation.previous().unwrap(); @@ -1513,14 +1520,15 @@ impl Persistence { let timeline_id = &timeline_id; self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| { Box::pin(async move { - let updated = diesel::update(dsl::timelines) - .filter(dsl::tenant_id.eq(&tenant_id.to_string())) - .filter(dsl::timeline_id.eq(&timeline_id.to_string())) - .filter(dsl::generation.eq(prev_generation.into_inner() as i32)) + let updated = diesel::update(timelines::table) + .filter(timelines::tenant_id.eq(&tenant_id.to_string())) + .filter(timelines::timeline_id.eq(&timeline_id.to_string())) + .filter(timelines::generation.eq(prev_generation.into_inner() as i32)) .set(( - dsl::generation.eq(new_generation.into_inner() as i32), - dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::>()), - dsl::new_sk_set.eq(new_sk_set + timelines::generation.eq(new_generation.into_inner() as i32), + timelines::sk_set + .eq(sk_set.iter().map(|id| id.0 as i64).collect::>()), + timelines::new_sk_set.eq(new_sk_set .map(|set| set.iter().map(|id| id.0 as i64).collect::>())), )) .execute(conn) @@ -1530,20 +1538,123 @@ impl Persistence { 0 => { // TODO(diko): It makes sense to select the current generation // and include it in the error message for better debuggability. - Err(DatabaseError::Cas( + return Err(DatabaseError::Cas( "Failed to update membership configuration".to_string(), - )) + )); + } + 1 => {} + _ => { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))); + } + }; + + for req in reconcile_requests { + let inserted_updated = diesel::insert_into(stpo::table) + .values(req) + .on_conflict((stpo::tenant_id, stpo::timeline_id, stpo::sk_id)) + .do_update() + .set(req) + .filter(stpo::generation.lt(req.generation)) + .execute(conn) + .await?; + + if inserted_updated > 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({inserted_updated})" + ))); } - 1 => Ok(()), - _ => Err(DatabaseError::Logical(format!( - "unexpected number of rows ({updated})" - ))), } + + Ok(()) }) }) .await } + /// Update the cplane notified generation for a timeline. + /// Perform a compare-and-swap (CAS) operation on the timeline's cplane notified generation. + /// The update will fail if the specified generation is less than the cplane notified generation + /// in the database. + pub(crate) async fn update_cplane_notified_generation( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + generation: SafekeeperGeneration, + ) -> DatabaseResult<()> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn( + DatabaseOperation::UpdateCplaneNotifiedGeneration, + move |conn| { + Box::pin(async move { + let updated = diesel::update(dsl::timelines) + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .filter(dsl::timeline_id.eq(&timeline_id.to_string())) + .filter(dsl::cplane_notified_generation.le(generation.into_inner() as i32)) + .set(dsl::cplane_notified_generation.eq(generation.into_inner() as i32)) + .execute(conn) + .await?; + + match updated { + 0 => Err(DatabaseError::Cas( + "Failed to update cplane notified generation".to_string(), + )), + 1 => Ok(()), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))), + } + }) + }, + ) + .await + } + + /// Update the sk set notified generation for a timeline. + /// Perform a compare-and-swap (CAS) operation on the timeline's sk set notified generation. + /// The update will fail if the specified generation is less than the sk set notified generation + /// in the database. + pub(crate) async fn update_sk_set_notified_generation( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + generation: SafekeeperGeneration, + ) -> DatabaseResult<()> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn( + DatabaseOperation::UpdateSkSetNotifiedGeneration, + move |conn| { + Box::pin(async move { + let updated = diesel::update(dsl::timelines) + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .filter(dsl::timeline_id.eq(&timeline_id.to_string())) + .filter(dsl::sk_set_notified_generation.le(generation.into_inner() as i32)) + .set(dsl::sk_set_notified_generation.eq(generation.into_inner() as i32)) + .execute(conn) + .await?; + + match updated { + 0 => Err(DatabaseError::Cas( + "Failed to update sk set notified generation".to_string(), + )), + 1 => Ok(()), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))), + } + }) + }, + ) + .await + } + /// Load timeline from db. Returns `None` if not present. pub(crate) async fn get_timeline( &self, @@ -2493,6 +2604,7 @@ pub(crate) struct TimelinePersistence { pub(crate) new_sk_set: Option>, pub(crate) cplane_notified_generation: i32, pub(crate) deleted_at: Option>, + pub(crate) sk_set_notified_generation: i32, } /// This is separate from [TimelinePersistence] only because postgres allows NULLs @@ -2511,6 +2623,7 @@ pub(crate) struct TimelineFromDb { pub(crate) new_sk_set: Option>>, pub(crate) cplane_notified_generation: i32, pub(crate) deleted_at: Option>, + pub(crate) sk_set_notified_generation: i32, } impl TimelineFromDb { @@ -2530,6 +2643,7 @@ impl TimelineFromDb { new_sk_set, cplane_notified_generation: self.cplane_notified_generation, deleted_at: self.deleted_at, + sk_set_notified_generation: self.sk_set_notified_generation, } } } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index f3dcdaf798..def519c168 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -118,6 +118,7 @@ diesel::table! { new_sk_set -> Nullable>>, cplane_notified_generation -> Int4, deleted_at -> Nullable, + sk_set_notified_generation -> Int4, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 71186076ec..8f5efe8ac4 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -487,6 +487,9 @@ pub struct Config { /// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None. pub shard_split_request_timeout: Duration, + + // Feature flag: Whether the storage controller should act to rectify pageserver-reported local disk loss. + pub handle_ps_local_disk_loss: bool, } impl From for ApiError { @@ -2388,6 +2391,33 @@ impl Service { tenants: Vec::new(), }; + // [Hadron] If the pageserver reports in the reattach message that it has an empty disk, it's possible that it just + // recovered from a local disk failure. The response of the reattach request will contain a list of tenants but it + // will not be honored by the pageserver in this case (disk failure). We should make sure we clear any observed + // locations of tenants attached to the node so that the reconciler will discover the discrpancy and reconfigure the + // missing tenants on the node properly. + if self.config.handle_ps_local_disk_loss && reattach_req.empty_local_disk.unwrap_or(false) { + tracing::info!( + "Pageserver {node_id} reports empty local disk, clearing observed locations referencing the pageserver for all tenants", + node_id = reattach_req.node_id + ); + let mut num_tenant_shards_affected = 0; + for (tenant_shard_id, shard) in tenants.iter_mut() { + if shard + .observed + .locations + .remove(&reattach_req.node_id) + .is_some() + { + tracing::info!("Cleared observed location for tenant shard {tenant_shard_id}"); + num_tenant_shards_affected += 1; + } + } + tracing::info!( + "Cleared observed locations for {num_tenant_shards_affected} tenant shards" + ); + } + // TODO: cancel/restart any running reconciliation for this tenant, it might be trying // to call location_conf API with an old generation. Wait for cancellation to complete // before responding to this request. Requires well implemented CancellationToken logic diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 28c70e203a..bc77a1a6b8 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -312,6 +312,7 @@ impl Service { new_sk_set: None, cplane_notified_generation: 0, deleted_at: None, + sk_set_notified_generation: 0, }; let inserted = self .persistence @@ -461,6 +462,7 @@ impl Service { new_sk_set: None, cplane_notified_generation: 1, deleted_at: None, + sk_set_notified_generation: 1, }; let inserted = self .persistence @@ -894,17 +896,21 @@ impl Service { /// If min_position is not None, validates that majority of safekeepers /// reached at least min_position. /// + /// If update_notified_generation is set, also updates sk_set_notified_generation + /// in the timelines table. + /// /// Return responses from safekeepers in the input order. async fn tenant_timeline_set_membership_quorum( self: &Arc, tenant_id: TenantId, timeline_id: TimelineId, safekeepers: &[Safekeeper], - config: &membership::Configuration, + mconf: &membership::Configuration, min_position: Option<(Term, Lsn)>, + update_notified_generation: bool, ) -> Result>, ApiError> { let req = TimelineMembershipSwitchRequest { - mconf: config.clone(), + mconf: mconf.clone(), }; const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); @@ -945,28 +951,34 @@ impl Service { .await?; for res in results.iter().flatten() { - if res.current_conf.generation > config.generation { + if res.current_conf.generation > mconf.generation { // Antoher switch_membership raced us. return Err(ApiError::Conflict(format!( "received configuration with generation {} from safekeeper, but expected {}", - res.current_conf.generation, config.generation + res.current_conf.generation, mconf.generation ))); - } else if res.current_conf.generation < config.generation { + } else if res.current_conf.generation < mconf.generation { // Note: should never happen. // If we get a response, it should be at least the sent generation. tracing::error!( "received configuration with generation {} from safekeeper, but expected {}", res.current_conf.generation, - config.generation + mconf.generation ); return Err(ApiError::InternalServerError(anyhow::anyhow!( "received configuration with generation {} from safekeeper, but expected {}", res.current_conf.generation, - config.generation + mconf.generation ))); } } + if update_notified_generation { + self.persistence + .update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation) + .await?; + } + Ok(results) } @@ -1035,17 +1047,22 @@ impl Service { } /// Exclude a timeline from safekeepers in parallel with retries. - /// If an exclude request is unsuccessful, it will be added to - /// the reconciler, and after that the function will succeed. - async fn tenant_timeline_safekeeper_exclude( + /// + /// Assumes that the exclude requests are already persistent in the database. + /// + /// The function does best effort: if an exclude request is unsuccessful, + /// it will be added to the in-memory reconciler, and the function will succeed anyway. + /// + /// Might fail if there is error accessing the database. + async fn tenant_timeline_safekeeper_exclude_reconcile( self: &Arc, tenant_id: TenantId, timeline_id: TimelineId, safekeepers: &[Safekeeper], - config: &membership::Configuration, + mconf: &membership::Configuration, ) -> Result<(), ApiError> { let req = TimelineMembershipSwitchRequest { - mconf: config.clone(), + mconf: mconf.clone(), }; const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30); @@ -1063,25 +1080,32 @@ impl Service { let mut reconcile_requests = Vec::new(); - for (idx, res) in results.iter().enumerate() { - if res.is_err() { - let sk_id = safekeepers[idx].skp.id; - let pending_op = TimelinePendingOpPersistence { - tenant_id: tenant_id.to_string(), - timeline_id: timeline_id.to_string(), - generation: config.generation.into_inner() as i32, - op_kind: SafekeeperTimelineOpKind::Exclude, - sk_id, - }; - tracing::info!("writing pending exclude op for sk id {sk_id}"); - self.persistence.insert_pending_op(pending_op).await?; + fail::fail_point!("sk-migration-step-9-mid-exclude", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-step-9-mid-exclude" + ))) + }); + for (idx, res) in results.iter().enumerate() { + let sk_id = safekeepers[idx].skp.id; + let generation = mconf.generation.into_inner(); + + if res.is_ok() { + self.persistence + .remove_pending_op( + tenant_id, + Some(timeline_id), + NodeId(sk_id as u64), + generation, + ) + .await?; + } else { let req = ScheduleRequest { safekeeper: Box::new(safekeepers[idx].clone()), host_list: Vec::new(), tenant_id, timeline_id: Some(timeline_id), - generation: config.generation.into_inner(), + generation, kind: SafekeeperTimelineOpKind::Exclude, }; reconcile_requests.push(req); @@ -1208,6 +1232,22 @@ impl Service { } // It it is the same new_sk_set, we can continue the migration (retry). } else { + let prev_finished = timeline.cplane_notified_generation == timeline.generation + && timeline.sk_set_notified_generation == timeline.generation; + + if !prev_finished { + // The previous migration is committed, but the finish step failed. + // Safekeepers/cplane might not know about the last membership configuration. + // Retry the finish step to ensure smooth migration. + self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline) + .await?; + } + + if cur_sk_set == new_sk_set { + tracing::info!("timeline is already at the desired safekeeper set"); + return Ok(()); + } + // 3. No active migration yet. // Increment current generation and put desired_set to new_sk_set. generation = generation.next(); @@ -1219,8 +1259,15 @@ impl Service { generation, &cur_sk_set, Some(&new_sk_set), + &[], ) .await?; + + fail::fail_point!("sk-migration-after-step-3", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-3" + ))) + }); } let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; @@ -1249,6 +1296,7 @@ impl Service { &cur_safekeepers, &joint_config, None, // no min position + true, // update notified generation ) .await?; @@ -1266,6 +1314,12 @@ impl Service { "safekeepers set membership updated", ); + fail::fail_point!("sk-migration-after-step-4", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-4" + ))) + }); + // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet // by doing pull_timeline from the majority of the current set. @@ -1285,6 +1339,12 @@ impl Service { ) .await?; + fail::fail_point!("sk-migration-after-step-5", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-5" + ))) + }); + // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough. // TODO(diko): do we need to bump timeline term? @@ -1300,9 +1360,16 @@ impl Service { &new_safekeepers, &joint_config, Some(sync_position), + false, // we're just waiting for sync position, don't update notified generation ) .await?; + fail::fail_point!("sk-migration-after-step-7", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-7" + ))) + }); + // 8. Create new_conf: Configuration incrementing joint_conf generation and // having new safekeeper set as sk_set and None new_sk_set. @@ -1314,45 +1381,55 @@ impl Service { new_members: None, }; - self.persistence - .update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None) - .await?; - - // TODO(diko): at this point we have already updated the timeline in the database, - // but we still need to notify safekeepers and cplane about the new configuration, - // and put delition of the timeline from the old safekeepers into the reconciler. - // Ideally it should be done atomically, but now it's not. - // Worst case: the timeline is not deleted from old safekeepers, - // the compute may require both quorums till the migration is retried and completed. - - self.tenant_timeline_set_membership_quorum( - tenant_id, - timeline_id, - &new_safekeepers, - &new_conf, - None, // no min position - ) - .await?; - let new_ids: HashSet = new_safekeepers.iter().map(|sk| sk.get_id()).collect(); let exclude_safekeepers = cur_safekeepers .into_iter() .filter(|sk| !new_ids.contains(&sk.get_id())) .collect::>(); - self.tenant_timeline_safekeeper_exclude( + let exclude_requests = exclude_safekeepers + .iter() + .map(|sk| TimelinePendingOpPersistence { + sk_id: sk.skp.id, + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: generation.into_inner() as i32, + op_kind: SafekeeperTimelineOpKind::Exclude, + }) + .collect::>(); + + self.persistence + .update_timeline_membership( + tenant_id, + timeline_id, + generation, + &new_sk_set, + None, + &exclude_requests, + ) + .await?; + + fail::fail_point!("sk-migration-after-step-8", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-8" + ))) + }); + + // At this point we have already updated the timeline in the database, so the final + // membership configuration is commited and the migration is not abortable anymore. + // But safekeepers and cplane/compute still need to be notified about the new configuration. + // The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about + // the new configuration and reconciles excluded safekeepers. + // If it fails, the safkeeper migration call should be retried. + + self.finish_safekeeper_migration( tenant_id, timeline_id, - &exclude_safekeepers, + &new_safekeepers, &new_conf, + &exclude_safekeepers, ) .await?; - // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers. - // This way the compute will stop talking to excluded safekeepers only after we stop requiring to - // collect a quorum from them. - self.cplane_notify_safekeepers(tenant_id, timeline_id, &new_conf) - .await?; - Ok(()) } @@ -1396,6 +1473,130 @@ impl Service { ApiError::InternalServerError(anyhow::anyhow!( "failed to notify cplane about safekeeper membership change: {err}" )) - }) + })?; + + self.persistence + .update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation) + .await?; + + Ok(()) + } + + /// Finish safekeeper migration. + /// + /// It is the last step of the safekeeper migration. + /// + /// Notifies safekeepers and cplane about the final membership configuration, + /// reconciles excluded safekeepers and updates *_notified_generation in the database. + async fn finish_safekeeper_migration( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + new_safekeepers: &[Safekeeper], + new_conf: &membership::Configuration, + exclude_safekeepers: &[Safekeeper], + ) -> Result<(), ApiError> { + // 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf. + // Also try to exclude safekeepers and notify cplane about the membership change. + + self.tenant_timeline_set_membership_quorum( + tenant_id, + timeline_id, + new_safekeepers, + new_conf, + None, // no min position + true, // update notified generation + ) + .await?; + + fail::fail_point!("sk-migration-step-9-after-set-membership", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-step-9-after-set-membership" + ))) + }); + + self.tenant_timeline_safekeeper_exclude_reconcile( + tenant_id, + timeline_id, + exclude_safekeepers, + new_conf, + ) + .await?; + + fail::fail_point!("sk-migration-step-9-after-exclude", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-step-9-after-exclude" + ))) + }); + + // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers. + // This way the compute will stop talking to excluded safekeepers only after we stop requiring to + // collect a quorum from them. + self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf) + .await?; + + fail::fail_point!("sk-migration-after-step-9", |_| { + Err(ApiError::BadRequest(anyhow::anyhow!( + "failpoint sk-migration-after-step-9" + ))) + }); + + Ok(()) + } + + /// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database. + /// It's used when the migration failed during the finish step and we need to retry it. + async fn finish_safekeeper_migration_retry( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + timeline: &TimelinePersistence, + ) -> Result<(), ApiError> { + if timeline.new_sk_set.is_some() { + // Logical error, should never happen. + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None" + ))); + } + + let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; + let cur_sk_member_set = + Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?; + + let mconf = membership::Configuration { + generation: SafekeeperGeneration::new(timeline.generation as u32), + members: cur_sk_member_set, + new_members: None, + }; + + // We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler. + // Reload them from the database. + let pending_ops = self + .persistence + .list_pending_ops_for_timeline(tenant_id, timeline_id) + .await?; + + let mut exclude_sk_ids = Vec::new(); + + for op in pending_ops { + if op.op_kind == SafekeeperTimelineOpKind::Exclude + && op.generation == timeline.generation + { + exclude_sk_ids.push(op.sk_id); + } + } + + let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?; + + self.finish_safekeeper_migration( + tenant_id, + timeline_id, + &cur_safekeepers, + &mconf, + &exclude_safekeepers, + ) + .await?; + + Ok(()) } } diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 1d278095ce..c43445e89d 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -66,6 +66,12 @@ class EndpointHttpClient(requests.Session): res.raise_for_status() return res.json() + def autoscaling_metrics(self): + res = self.get(f"http://localhost:{self.external_port}/autoscaling_metrics") + res.raise_for_status() + log.debug("raw compute metrics: %s", res.text) + return res.text + def prewarm_lfc_status(self) -> dict[str, str]: res = self.get(self.prewarm_url) res.raise_for_status() diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index bb618325e0..b26bcb286c 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -24,6 +24,7 @@ def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]: # Some API calls not yet implemented. # You may want to copy not-yet-implemented methods from the PR https://github.com/neondatabase/neon/pull/11305 +@final class NeonAPI: def __init__(self, neon_api_key: str, neon_api_base_url: str): self.__neon_api_key = neon_api_key @@ -170,7 +171,7 @@ class NeonAPI: protected: bool | None = None, archived: bool | None = None, init_source: str | None = None, - add_endpoint=True, + add_endpoint: bool = True, ) -> dict[str, Any]: data: dict[str, Any] = {} if add_endpoint: diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 18c9cf89e7..ac2ceffcc1 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -400,6 +400,7 @@ class NeonLocalCli(AbstractNeonCli): timeout_in_seconds: int | None = None, instance_id: int | None = None, base_port: int | None = None, + handle_ps_local_disk_loss: bool | None = None, ): cmd = ["storage_controller", "start"] if timeout_in_seconds is not None: @@ -408,6 +409,10 @@ class NeonLocalCli(AbstractNeonCli): cmd.append(f"--instance-id={instance_id}") if base_port is not None: cmd.append(f"--base-port={base_port}") + if handle_ps_local_disk_loss is not None: + cmd.append( + f"--handle-ps-local-disk-loss={'true' if handle_ps_local_disk_loss else 'false'}" + ) return self.raw_cli(cmd) def storage_controller_stop(self, immediate: bool, instance_id: int | None = None): diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e402af1129..69d09d48be 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1940,9 +1940,12 @@ class NeonStorageController(MetricsGetter, LogUtils): timeout_in_seconds: int | None = None, instance_id: int | None = None, base_port: int | None = None, + handle_ps_local_disk_loss: bool | None = None, ) -> Self: assert not self.running - self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) + self.env.neon_cli.storage_controller_start( + timeout_in_seconds, instance_id, base_port, handle_ps_local_disk_loss + ) self.running = True return self @@ -2840,10 +2843,13 @@ class NeonProxiedStorageController(NeonStorageController): timeout_in_seconds: int | None = None, instance_id: int | None = None, base_port: int | None = None, + handle_ps_local_disk_loss: bool | None = None, ) -> Self: assert instance_id is not None and base_port is not None - self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) + self.env.neon_cli.storage_controller_start( + timeout_in_seconds, instance_id, base_port, handle_ps_local_disk_loss + ) self.instances[instance_id] = {"running": True} self.running = True @@ -5799,6 +5805,7 @@ SKIP_FILES = frozenset( "postmaster.pid", "pg_control", "pg_dynshmem", + "neon-communicator.socket", ) ) diff --git a/test_runner/regress/test_communicator_metrics_exporter.py b/test_runner/regress/test_communicator_metrics_exporter.py new file mode 100644 index 0000000000..0e3e76910a --- /dev/null +++ b/test_runner/regress/test_communicator_metrics_exporter.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +import pytest +import requests +import requests_unixsocket # type: ignore [import-untyped] +from fixtures.metrics import parse_metrics + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnv + +NEON_COMMUNICATOR_SOCKET_NAME = "neon-communicator.socket" + + +def test_communicator_metrics(neon_simple_env: NeonEnv): + """ + Test the communicator's built-in HTTP prometheus exporter + """ + env = neon_simple_env + + endpoint = env.endpoints.create("main") + endpoint.start() + + # Change current directory to the data directory, so that we can use + # a short relative path to refer to the socket. (There's a 100 char + # limitation on the path.) + os.chdir(str(endpoint.pgdata_dir)) + session = requests_unixsocket.Session() + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics") + assert r.status_code == 200, f"got response {r.status_code}: {r.text}" + + # quick test that the endpoint returned something expected. (We don't validate + # that the metrics returned are sensible.) + m = parse_metrics(r.text) + m.query_one("lfc_hits") + m.query_one("lfc_misses") + + # Test panic handling. The /debug/panic endpoint raises a Rust panic. It's + # expected to unwind and drop the HTTP connection without response, but not + # kill the process or the server. + with pytest.raises( + requests.ConnectionError, match="Remote end closed connection without response" + ): + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/debug/panic") + assert r.status_code == 500 + + # Test that subsequent requests after the panic still work. + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics") + assert r.status_code == 200, f"got response {r.status_code}: {r.text}" + m = parse_metrics(r.text) + m.query_one("lfc_hits") + m.query_one("lfc_misses") diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index a3a20cdc62..734887c5b3 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -197,7 +197,7 @@ def test_create_snapshot( shutil.copytree( test_output_dir, new_compatibility_snapshot_dir, - ignore=shutil.ignore_patterns("pg_dynshmem"), + ignore=shutil.ignore_patterns("pg_dynshmem", "neon-communicator.socket"), ) log.info(f"Copied new compatibility snapshot dir to: {new_compatibility_snapshot_dir}") diff --git a/test_runner/regress/test_hcc_handling_ps_data_loss.py b/test_runner/regress/test_hcc_handling_ps_data_loss.py new file mode 100644 index 0000000000..35d3b72923 --- /dev/null +++ b/test_runner/regress/test_hcc_handling_ps_data_loss.py @@ -0,0 +1,47 @@ +import shutil + +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.utils import query_scalar + + +def test_hcc_handling_ps_data_loss( + neon_env_builder: NeonEnvBuilder, +): + """ + Test that following a pageserver local data loss event, the system can recover automatically (i.e. + rehydrating the restarted pageserver from remote storage) without manual intervention. The + pageserver indicates to the storage controller that it has restarted without any local tenant + data in its "reattach" request and the storage controller uses this information to detect the + data loss condition and reconfigure the pageserver as necessary. + """ + env = neon_env_builder.init_configs() + env.broker.start() + env.storage_controller.start(handle_ps_local_disk_loss=True) + env.pageserver.start() + for sk in env.safekeepers: + sk.start() + + # create new nenant + tenant_id, _ = env.create_tenant(shard_count=4) + + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + with endpoint.cursor() as cur: + cur.execute("SELECT pg_logical_emit_message(false, 'neon-test', 'between inserts')") + cur.execute("CREATE DATABASE testdb") + + with endpoint.cursor(dbname="testdb") as cur: + cur.execute("CREATE TABLE tbl_one_hundred_rows AS SELECT generate_series(1,100)") + endpoint.stop() + + # Kill the pageserver, remove the `tenants/` directory, and restart. This simulates a pageserver + # that restarted with the same ID but has lost all its local disk data. + env.pageserver.stop(immediate=True) + shutil.rmtree(env.pageserver.tenant_dir()) + env.pageserver.start() + + # Test that the endpoint can start and query the database after the pageserver restarts. This + # indirectly tests that the pageserver was able to rehydrate the tenant data it lost from remote + # storage automatically. + endpoint.start() + with endpoint.cursor(dbname="testdb") as cur: + assert query_scalar(cur, "SELECT count(*) FROM tbl_one_hundred_rows") == 100 diff --git a/test_runner/regress/test_lfc_working_set_approximation.py b/test_runner/regress/test_lfc_working_set_approximation.py index a28bc3d047..2ee15b60fd 100644 --- a/test_runner/regress/test_lfc_working_set_approximation.py +++ b/test_runner/regress/test_lfc_working_set_approximation.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING import pytest from fixtures.log_helper import log +from fixtures.metrics import parse_metrics from fixtures.utils import USE_LFC, query_scalar if TYPE_CHECKING: @@ -75,10 +76,24 @@ WITH (fillfactor='100'); cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 104242") cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 204242") # verify working set size after some index access of a few select pages only - blocks = query_scalar(cur, "select approximate_working_set_size(true)") + blocks = query_scalar(cur, "select approximate_working_set_size(false)") log.info(f"working set size after some index access of a few select pages only {blocks}") assert blocks < 20 + # Also test the metrics from the /autoscaling_metrics endpoint + autoscaling_metrics = endpoint.http_client().autoscaling_metrics() + log.debug(f"Raw metrics: {autoscaling_metrics}") + m = parse_metrics(autoscaling_metrics) + + http_estimate = m.query_one( + "lfc_approximate_working_set_size_windows", + { + "duration_seconds": "60", + }, + ).value + log.info(f"http estimate: {http_estimate}, blocks: {blocks}") + assert http_estimate > 0 and http_estimate < 20 + @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") def test_sliding_working_set_approximation(neon_simple_env: NeonEnv): diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 170c1a3650..371bec0c62 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -3,11 +3,22 @@ from __future__ import annotations from typing import TYPE_CHECKING import pytest +import requests +from fixtures.log_helper import log from fixtures.neon_fixtures import StorageControllerApiException if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnvBuilder +# TODO(diko): pageserver spams with various errors during safekeeper migration. +# Fix the code so it handles the migration better. +ALLOWED_PAGESERVER_ERRORS = [ + ".*Timeline .* was cancelled and cannot be used anymore.*", + ".*Timeline .* has been deleted.*", + ".*Timeline .* was not found in global map.*", + ".*wal receiver task finished with an error.*", +] + def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): """ @@ -24,16 +35,7 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): "timeline_safekeeper_count": 1, } env = neon_env_builder.init_start() - # TODO(diko): pageserver spams with various errors during safekeeper migration. - # Fix the code so it handles the migration better. - env.pageserver.allowed_errors.extend( - [ - ".*Timeline .* was cancelled and cannot be used anymore.*", - ".*Timeline .* has been deleted.*", - ".*Timeline .* was not found in global map.*", - ".*wal receiver task finished with an error.*", - ] - ) + env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) ep = env.endpoints.create("main", tenant_id=env.initial_tenant) @@ -42,15 +44,23 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): assert len(mconf["sk_set"]) == 1 assert mconf["generation"] == 1 + current_sk = mconf["sk_set"][0] + ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"]) ep.safe_psql("CREATE EXTENSION neon_test_utils;") ep.safe_psql("CREATE TABLE t(a int)") + expected_gen = 1 + for active_sk in range(1, 4): env.storage_controller.migrate_safekeepers( env.initial_tenant, env.initial_timeline, [active_sk] ) + if active_sk != current_sk: + expected_gen += 2 + current_sk = active_sk + other_sks = [sk for sk in range(1, 4) if sk != active_sk] for sk in other_sks: @@ -65,9 +75,6 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)] - # 1 initial generation + 2 migrations on each loop iteration. - expected_gen = 1 + 2 * 3 - mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) assert mconf["generation"] == expected_gen @@ -113,3 +120,79 @@ def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder): env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned") expect_fail([sk_set[0], decom_sk], "decomissioned") + + +def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper migration handles failures well. + + Two main conditions are checked: + 1. safekeeper migration handler can be retried on different failures. + 2. writes do not stuck if sk_set and new_sk_set have a quorum in common. + """ + neon_env_builder.num_safekeepers = 4 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 3, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(ALLOWED_PAGESERVER_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert len(mconf["sk_set"]) == 3 + assert mconf["generation"] == 1 + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"]) + ep.safe_psql("CREATE EXTENSION neon_test_utils;") + ep.safe_psql("CREATE TABLE t(a int)") + + excluded_sk = mconf["sk_set"][-1] + added_sk = [sk.id for sk in env.safekeepers if sk.id not in mconf["sk_set"]][0] + new_sk_set = mconf["sk_set"][:-1] + [added_sk] + log.info(f"migrating sk set from {mconf['sk_set']} to {new_sk_set}") + + failpoints = [ + "sk-migration-after-step-3", + "sk-migration-after-step-4", + "sk-migration-after-step-5", + "sk-migration-after-step-7", + "sk-migration-after-step-8", + "sk-migration-step-9-after-set-membership", + "sk-migration-step-9-mid-exclude", + "sk-migration-step-9-after-exclude", + "sk-migration-after-step-9", + ] + + for i, fp in enumerate(failpoints): + env.storage_controller.configure_failpoints((fp, "return(1)")) + + with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, new_sk_set + ) + ep.safe_psql(f"INSERT INTO t VALUES ({i})") + + env.storage_controller.configure_failpoints((fp, "off")) + + # No failpoints, migration should succeed. + env.storage_controller.migrate_safekeepers(env.initial_tenant, env.initial_timeline, new_sk_set) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["new_sk_set"] is None + assert mconf["sk_set"] == new_sk_set + assert mconf["generation"] == 3 + + ep.clear_buffers() + assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(len(failpoints))] + assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith("g#3:") + + # Check that we didn't forget to remove the timeline on the excluded safekeeper. + with pytest.raises(requests.exceptions.HTTPError) as exc: + env.safekeepers[excluded_sk - 1].http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + assert exc.value.response.status_code == 404 + assert ( + f"timeline {env.initial_tenant}/{env.initial_timeline} deleted" in exc.value.response.text + ) diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index d6d64a2045..f5984d3ac3 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -74,7 +74,7 @@ once_cell = { version = "1" } p256 = { version = "0.13", features = ["jwk"] } parquet = { version = "53", default-features = false, features = ["zstd"] } prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] } -rand = { version = "0.8", features = ["small_rng"] } +rand = { version = "0.9" } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" } @@ -93,6 +93,7 @@ spki = { version = "0.7", default-features = false, features = ["pem", "std"] } stable_deref_trait = { version = "1" } subtle = { version = "2" } sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] } +thiserror = { version = "2" } tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] } tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } time = { version = "0.3", features = ["macros", "serde-well-known"] } @@ -101,6 +102,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin tokio-stream = { version = "0.1", features = ["net", "sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } +tonic = { version = "0.13", default-features = false, features = ["codegen", "gzip", "prost", "router", "server", "tls-native-roots", "tls-ring", "zstd"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" }