From bd4dae8f4a5d4ea3fe2279dbd30a30a0943ffbab Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 1 Jan 2024 14:38:08 +0300 Subject: [PATCH 1/6] compute_ctl: kill postgres and sync-safekeeprs on exit. Otherwise they are left orphaned when compute_ctl is terminated with a signal. It was invisible most of the time because normally neon_local or k8s kills postgres directly and then compute_ctl finishes gracefully. However, in some tests compute_ctl gets stuck waiting for sync-safekeepers which intentionally never ends because safekeepers are offline, and we want to stop compute_ctl without leaving orphanes behind. This is a quite rough approach which doesn't wait for children termination. A better way would be to convert compute_ctl to async which would make waiting easy. --- Cargo.lock | 2 ++ compute_tools/Cargo.toml | 2 ++ compute_tools/src/bin/compute_ctl.rs | 32 +++++++++++++++++++++++++++- compute_tools/src/compute.rs | 8 +++++++ control_plane/src/endpoint.rs | 18 ++++++++++++---- 5 files changed, 57 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index abd87dc0da..8e0ad7c8ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,6 +1161,7 @@ dependencies = [ "flate2", "futures", "hyper", + "nix 0.26.2", "notify", "num_cpus", "opentelemetry", @@ -1171,6 +1172,7 @@ dependencies = [ "rust-ini", "serde", "serde_json", + "signal-hook", "tar", "tokio", "tokio-postgres", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 142fa08495..759a117ee9 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -13,6 +13,7 @@ clap.workspace = true flate2.workspace = true futures.workspace = true hyper = { workspace = true, features = ["full"] } +nix.workspace = true notify.workspace = true num_cpus.workspace = true opentelemetry.workspace = true @@ -20,6 +21,7 @@ postgres.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true +signal-hook.workspace = true tar.workspace = true reqwest = { workspace = true, features = ["json"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 436db59088..eb1d746f04 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -40,18 +40,22 @@ use std::collections::HashMap; use std::fs::File; use std::path::Path; use std::process::exit; +use std::sync::atomic::Ordering; use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; +use nix::sys::signal::{kill, Signal}; +use signal_hook::consts::{SIGQUIT, SIGTERM}; +use signal_hook::{consts::SIGINT, iterator::Signals}; use tracing::{error, info}; use url::Url; use compute_api::responses::ComputeStatus; -use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; +use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID}; use compute_tools::configurator::launch_configurator; use compute_tools::extension_server::get_pg_version; use compute_tools::http::api::launch_http_server; @@ -67,6 +71,13 @@ const BUILD_TAG_DEFAULT: &str = "latest"; fn main() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; + let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; + thread::spawn(move || { + for sig in signals.forever() { + handle_exit_signal(sig); + } + }); + let build_tag = option_env!("BUILD_TAG") .unwrap_or(BUILD_TAG_DEFAULT) .to_string(); @@ -346,6 +357,7 @@ fn main() -> Result<()> { let ecode = pg .wait() .expect("failed to start waiting on Postgres process"); + PG_PID.store(0, Ordering::SeqCst); info!("Postgres exited with code {}, shutting down", ecode); exit_code = ecode.code() } @@ -519,6 +531,24 @@ fn cli() -> clap::Command { ) } +/// When compute_ctl is killed, send also termination signal to sync-safekeepers +/// to prevent leakage. TODO: it is better to convert compute_ctl to async and +/// wait for termination which would be easy then. +fn handle_exit_signal(sig: i32) { + info!("received {sig} termination signal"); + let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst); + if ss_pid != 0 { + let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32); + kill(ss_pid, Signal::SIGTERM).ok(); + } + let pg_pid = PG_PID.load(Ordering::SeqCst); + if pg_pid != 0 { + let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32); + kill(pg_pid, Signal::SIGTERM).ok(); + } + exit(1); +} + #[test] fn verify_cli() { cli().debug_assert() diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index cd7be0520e..13701b7378 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -6,6 +6,8 @@ use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; use std::sync::{Condvar, Mutex, RwLock}; use std::thread; use std::time::Instant; @@ -34,6 +36,9 @@ use crate::spec::*; use crate::sync_sk::{check_if_synced, ping_safekeeper}; use crate::{config, extension_server}; +pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0); +pub static PG_PID: AtomicU32 = AtomicU32::new(0); + /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { // Url type maintains proper escaping @@ -501,6 +506,7 @@ impl ComputeNode { .stdout(Stdio::piped()) .spawn() .expect("postgres --sync-safekeepers failed to start"); + SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst); // `postgres --sync-safekeepers` will print all log output to stderr and // final LSN to stdout. So we pipe only stdout, while stderr will be automatically @@ -508,6 +514,7 @@ impl ComputeNode { let sync_output = sync_handle .wait_with_output() .expect("postgres --sync-safekeepers failed"); + SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst); if !sync_output.status.success() { anyhow::bail!( @@ -662,6 +669,7 @@ impl ComputeNode { }) .spawn() .expect("cannot start postgres process"); + PG_PID.store(pg.id(), Ordering::SeqCst); wait_for_postgres(&mut pg, pgdata_path)?; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 55b66742ca..3d5dfd6311 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -46,6 +46,8 @@ use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; use compute_api::spec::RemoteExtSpec; +use nix::sys::signal::kill; +use nix::sys::signal::Signal; use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId, TimelineId}; @@ -439,11 +441,14 @@ impl Endpoint { Ok(()) } - fn wait_for_compute_ctl_to_exit(&self) -> Result<()> { + fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> { // TODO use background_process::stop_process instead let pidfile_path = self.endpoint_path().join("compute_ctl.pid"); let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?; let pid = nix::unistd::Pid::from_raw(pid as i32); + if send_sigterm { + kill(pid, Signal::SIGTERM).ok(); + } crate::background_process::wait_until_stopped("compute_ctl", pid)?; Ok(()) } @@ -733,10 +738,15 @@ impl Endpoint { &None, )?; - // Also wait for the compute_ctl process to die. It might have some cleanup - // work to do after postgres stops, like syncing safekeepers, etc. + // Also wait for the compute_ctl process to die. It might have some + // cleanup work to do after postgres stops, like syncing safekeepers, + // etc. // - self.wait_for_compute_ctl_to_exit()?; + // If destroying, send it SIGTERM before waiting. Sometimes we do *not* + // want this cleanup: tests intentionally do stop when majority of + // safekeepers is down, so sync-safekeepers would hang otherwise. This + // could be a separate flag though. + self.wait_for_compute_ctl_to_exit(destroy)?; if destroy { println!( "Destroying postgres data directory '{}'", From 392843ad2ac550d63a8847f74f83e955addfb701 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 1 Jan 2024 14:43:44 +0300 Subject: [PATCH 2/6] Fix safekeeper START_REPLICATION (term=n). It was giving WAL only up to commit_lsn instead of flush_lsn, so recovery of uncommitted WAL since cdb08f03 hanged. Add test for this. --- safekeeper/src/send_wal.rs | 11 +---- .../regress/test_wal_acceptor_async.py | 40 +++++++++++++++++++ 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 44f14f8c7e..70590a0f95 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -391,15 +391,8 @@ impl SafekeeperPostgresHandler { // application_name: give only committed WAL (used by pageserver) or all // existing WAL (up to flush_lsn, used by walproposer or peer recovery). // The second case is always driven by a consensus leader which term - // must generally be also supplied. However we're sloppy to do this in - // walproposer recovery which will be removed soon. So TODO is to make - // it not Option'al then. - // - // Fetching WAL without term in recovery creates a small risk of this - // WAL getting concurrently garbaged if another compute rises which - // collects majority and starts fixing log on this safekeeper itself. - // That's ok as (old) proposer will never be able to commit such WAL. - let end_watch = if self.is_walproposer_recovery() { + // must be supplied. + let end_watch = if term.is_some() { EndWatch::Flush(tli.get_term_flush_lsn_watch_rx()) } else { EndWatch::Commit(tli.get_commit_lsn_watch_rx()) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index feab7e605b..77d67cd63a 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -475,6 +475,46 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder): asyncio.run(run_unavailability(env, endpoint)) +async def run_recovery_uncommitted(env: NeonEnv): + (sk1, sk2, _) = env.safekeepers + + env.neon_cli.create_branch("test_recovery_uncommitted") + ep = env.endpoints.create_start("test_recovery_uncommitted") + ep.safe_psql("create table t(key int, value text)") + ep.safe_psql("insert into t select generate_series(1, 100), 'payload'") + + # insert with only one safekeeper up to create tail of flushed but not committed WAL + sk1.stop() + sk2.stop() + conn = await ep.connect_async() + # query should hang, so execute in separate task + bg_query = asyncio.create_task( + conn.execute("insert into t select generate_series(1, 2000), 'payload'") + ) + sleep_sec = 2 + await asyncio.sleep(sleep_sec) + # it must still be not finished + assert not bg_query.done() + # note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers. + ep.stop_and_destroy() + + # Start one of sks to make quorum online plus compute and ensure they can + # sync. + sk2.start() + ep = env.endpoints.create_start( + "test_recovery_uncommitted", + ) + ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'") + + +# Test pulling uncommitted WAL (up to flush_lsn) during recovery. +def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + asyncio.run(run_recovery_uncommitted(env)) + + @dataclass class RaceConditionTest: iteration: int From d6cfcb0d93529bfae5a20a751e3f280fbe8424e9 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 1 Jan 2024 22:33:27 +0300 Subject: [PATCH 3/6] Move failpoint support code to utils. To enable them in safekeeper as well. --- Cargo.lock | 1 + libs/pageserver_api/src/models.rs | 13 ----- libs/utils/Cargo.toml | 7 +++ .../utils}/src/failpoint_support.rs | 57 ++++++++++++++++++- libs/utils/src/lib.rs | 2 + pageserver/src/bin/pageserver.rs | 3 +- pageserver/src/http/routes.rs | 32 +---------- pageserver/src/lib.rs | 2 - pageserver/src/tenant.rs | 9 ++- pageserver/src/walingest.rs | 5 +- 10 files changed, 74 insertions(+), 57 deletions(-) rename {pageserver => libs/utils}/src/failpoint_support.rs (61%) diff --git a/Cargo.lock b/Cargo.lock index 8e0ad7c8ee..73cb83d3a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5880,6 +5880,7 @@ dependencies = [ "chrono", "const_format", "criterion", + "fail", "futures", "heapless", "hex", diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index be41b610b8..dea925b468 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -557,19 +557,6 @@ pub enum DownloadRemoteLayersTaskState { ShutDown, } -pub type ConfigureFailpointsRequest = Vec; - -/// Information for configuring a single fail point -#[derive(Debug, Serialize, Deserialize)] -pub struct FailpointConfig { - /// Name of the fail point - pub name: String, - /// List of actions to take, using the format described in `fail::cfg` - /// - /// We also support `actions = "exit"` to cause the fail point to immediately exit. - pub actions: String, -} - #[derive(Debug, Serialize, Deserialize)] pub struct TimelineGcRequest { pub gc_horizon: Option, diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index af0414daa2..706b7a3187 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -4,6 +4,12 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, +# which adds some runtime cost to run tests on outage conditions +testing = ["fail/failpoints"] + [dependencies] arc-swap.workspace = true sentry.workspace = true @@ -16,6 +22,7 @@ chrono.workspace = true heapless.workspace = true hex = { workspace = true, features = ["serde"] } hyper = { workspace = true, features = ["full"] } +fail.workspace = true futures = { workspace = true} jsonwebtoken.workspace = true nix.workspace = true diff --git a/pageserver/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs similarity index 61% rename from pageserver/src/failpoint_support.rs rename to libs/utils/src/failpoint_support.rs index 2190eba18a..5ec532e2a6 100644 --- a/pageserver/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -1,3 +1,14 @@ +//! Failpoint support code shared between pageserver and safekeepers. + +use crate::http::{ + error::ApiError, + json::{json_request, json_response}, +}; +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; +use tracing::*; + /// use with fail::cfg("$name", "return(2000)") /// /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the @@ -25,7 +36,7 @@ pub use __failpoint_sleep_millis_async as sleep_millis_async; // Helper function used by the macro. (A function has nicer scoping so we // don't need to decorate everything with "::") #[doc(hidden)] -pub(crate) async fn failpoint_sleep_helper(name: &'static str, duration_str: String) { +pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) { let millis = duration_str.parse::().unwrap(); let d = std::time::Duration::from_millis(millis); @@ -71,7 +82,7 @@ pub fn init() -> fail::FailScenario<'static> { scenario } -pub(crate) fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> { +pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> { if actions == "exit" { fail::cfg_callback(name, exit_failpoint) } else { @@ -84,3 +95,45 @@ fn exit_failpoint() { tracing::info!("Exit requested by failpoint"); std::process::exit(1); } + +pub type ConfigureFailpointsRequest = Vec; + +/// Information for configuring a single fail point +#[derive(Debug, Serialize, Deserialize)] +pub struct FailpointConfig { + /// Name of the fail point + pub name: String, + /// List of actions to take, using the format described in `fail::cfg` + /// + /// We also support `actions = "exit"` to cause the fail point to immediately exit. + pub actions: String, +} + +/// Configure failpoints through http. +pub async fn failpoints_handler( + mut request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + if !fail::has_failpoints() { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Cannot manage failpoints because storage was compiled without failpoints support" + ))); + } + + let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?; + for fp in failpoints { + info!("cfg failpoint: {} {}", fp.name, fp.actions); + + // We recognize one extra "action" that's not natively recognized + // by the failpoints crate: exit, to immediately kill the process + let cfg_result = apply_failpoint(&fp.name, &fp.actions); + + if let Err(err_msg) = cfg_result { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Failed to configure failpoints: {err_msg}" + ))); + } + } + + json_response(StatusCode::OK, ()) +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index bb6c848bf4..9e9b0adfe5 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -83,6 +83,8 @@ pub mod timeout; pub mod sync; +pub mod failpoint_support; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index f65c4f4580..621ad050f4 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -31,6 +31,7 @@ use pageserver::{ virtual_file, }; use postgres_backend::AuthType; +use utils::failpoint_support; use utils::logging::TracingErrorLayerEnablement; use utils::signals::ShutdownSignals; use utils::{ @@ -126,7 +127,7 @@ fn main() -> anyhow::Result<()> { } // Initialize up failpoints support - let scenario = pageserver::failpoint_support::init(); + let scenario = failpoint_support::init(); // Basic initialization of things that don't change after startup virtual_file::init(conf.max_file_descriptors); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 11a3a2c872..157e6b4e3e 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -25,6 +25,7 @@ use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::auth::JwtAuth; +use utils::failpoint_support::failpoints_handler; use utils::http::endpoint::request_span; use utils::http::json::json_request_or_empty_body; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; @@ -66,9 +67,6 @@ use utils::{ lsn::Lsn, }; -// Imports only used for testing APIs -use pageserver_api::models::ConfigureFailpointsRequest; - // For APIs that require an Active tenant, how long should we block waiting for that state? // This is not functionally necessary (clients will retry), but avoids generating a lot of // failed API calls while tenants are activating. @@ -1293,34 +1291,6 @@ async fn handle_tenant_break( json_response(StatusCode::OK, ()) } -async fn failpoints_handler( - mut request: Request, - _cancel: CancellationToken, -) -> Result, ApiError> { - if !fail::has_failpoints() { - return Err(ApiError::BadRequest(anyhow!( - "Cannot manage failpoints because pageserver was compiled without failpoints support" - ))); - } - - let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?; - for fp in failpoints { - info!("cfg failpoint: {} {}", fp.name, fp.actions); - - // We recognize one extra "action" that's not natively recognized - // by the failpoints crate: exit, to immediately kill the process - let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions); - - if let Err(err_msg) = cfg_result { - return Err(ApiError::BadRequest(anyhow!( - "Failed to configure failpoints: {err_msg}" - ))); - } - } - - json_response(StatusCode::OK, ()) -} - // Run GC immediately on given timeline. async fn timeline_gc_handler( mut request: Request, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 58adf6e8c4..c1ce0af47b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -25,8 +25,6 @@ pub mod walingest; pub mod walrecord; pub mod walredo; -pub mod failpoint_support; - use crate::task_mgr::TaskKind; use camino::Utf8Path; use deletion_queue::DeletionQueue; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2f2169d194..e50987c84b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -33,6 +33,7 @@ use tracing::*; use utils::backoff; use utils::completion; use utils::crashsafe::path_with_suffix_extension; +use utils::failpoint_support; use utils::fs_ext; use utils::sync::gate::Gate; use utils::sync::gate::GateGuard; @@ -890,7 +891,7 @@ impl Tenant { ) -> anyhow::Result<()> { span::debug_assert_current_span_has_tenant_id(); - crate::failpoint_support::sleep_millis_async!("before-attaching-tenant"); + failpoint_support::sleep_millis_async!("before-attaching-tenant"); let preload = match preload { Some(p) => p, @@ -1002,7 +1003,7 @@ impl Tenant { // IndexPart is the source of truth. self.clean_up_timelines(&existent_timelines)?; - crate::failpoint_support::sleep_millis_async!("attach-before-activate"); + failpoint_support::sleep_millis_async!("attach-before-activate"); info!("Done"); @@ -2839,9 +2840,7 @@ impl Tenant { } }; - crate::failpoint_support::sleep_millis_async!( - "gc_iteration_internal_after_getting_gc_timelines" - ); + failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines"); // If there is nothing to GC, we don't want any messages in the INFO log. if !gc_timelines.is_empty() { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 1d14214030..a6a8972970 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -29,6 +29,7 @@ use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; use anyhow::{bail, Context, Result}; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; +use utils::failpoint_support; use crate::context::RequestContext; use crate::metrics::WAL_INGEST; @@ -344,9 +345,7 @@ impl<'a> WalIngest<'a> { // particular point in the WAL. For more fine-grained control, // we could peek into the message and only pause if it contains // a particular string, for example, but this is enough for now. - crate::failpoint_support::sleep_millis_async!( - "wal-ingest-logical-message-sleep" - ); + failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); } else if let Some(path) = prefix.strip_prefix("neon-file:") { modification.put_file(path, message, ctx).await?; } From 2f83f852913887a630517b1b124424946761e7f4 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 1 Jan 2024 23:32:24 +0300 Subject: [PATCH 4/6] Add failpoint support to safekeeper. Just a copy paste from pageserver. --- Cargo.lock | 1 + safekeeper/Cargo.toml | 7 ++++++ safekeeper/src/bin/safekeeper.rs | 17 ++++++++++++- safekeeper/src/http/routes.rs | 8 ++++++ test_runner/fixtures/neon_fixtures.py | 36 +++++++++++++++++++++++---- 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73cb83d3a7..55e868a6d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4449,6 +4449,7 @@ dependencies = [ "clap", "const_format", "crc32c", + "fail", "fs2", "futures", "git-version", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index cccb4ebd79..4015c27933 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -4,6 +4,12 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, +# which adds some runtime cost to run tests on outage conditions +testing = ["fail/failpoints"] + [dependencies] async-stream.workspace = true anyhow.workspace = true @@ -16,6 +22,7 @@ chrono.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true crc32c.workspace = true +fail.workspace = true fs2.workspace = true git-version.workspace = true hex.workspace = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index e59deb9fda..33047051df 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -54,6 +54,19 @@ const ID_FILE_NAME: &str = "safekeeper.id"; project_git_version!(GIT_VERSION); project_build_tag!(BUILD_TAG); +const FEATURES: &[&str] = &[ + #[cfg(feature = "testing")] + "testing", +]; + +fn version() -> String { + format!( + "{GIT_VERSION} failpoints: {}, features: {:?}", + fail::has_failpoints(), + FEATURES, + ) +} + const ABOUT: &str = r#" A fleet of safekeepers is responsible for reliably storing WAL received from compute, passing it through consensus (mitigating potential computes brain @@ -167,7 +180,9 @@ async fn main() -> anyhow::Result<()> { // getting 'argument cannot be used multiple times' error. This seems to be // impossible with pure Derive API, so convert struct to Command, modify it, // parse arguments, and then fill the struct back. - let cmd = ::command().args_override_self(true); + let cmd = ::command() + .args_override_self(true) + .version(version()); let mut matches = cmd.get_matches(); let mut args = ::from_arg_matches_mut(&mut matches)?; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index c48b5330b3..25a3334e63 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -12,6 +12,8 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::fs::File; use tokio::io::AsyncReadExt; +use tokio_util::sync::CancellationToken; +use utils::failpoint_support::failpoints_handler; use std::io::Write as _; use tokio::sync::mpsc; @@ -444,6 +446,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .data(Arc::new(conf)) .data(auth) .get("/v1/status", |r| request_span(r, status_handler)) + .put("/v1/failpoints", |r| { + request_span(r, move |r| async { + let cancel = CancellationToken::new(); + failpoints_handler(r, cancel).await + }) + }) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", |r| { request_span(r, timeline_create_handler) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 597e311e02..9aa82d8854 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -890,8 +890,8 @@ class NeonEnv: """Get list of safekeeper endpoints suitable for safekeepers GUC""" return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers) - def get_pageserver_version(self) -> str: - bin_pageserver = str(self.neon_binpath / "pageserver") + def get_binary_version(self, binary_name: str) -> str: + bin_pageserver = str(self.neon_binpath / binary_name) res = subprocess.run( [bin_pageserver, "--version"], check=True, @@ -1656,7 +1656,7 @@ class NeonPageserver(PgProtocol): self.running = False self.service_port = port self.config_override = config_override - self.version = env.get_pageserver_version() + self.version = env.get_binary_version("pageserver") # After a test finishes, we will scrape the log to see if there are any # unexpected error messages. If your test expects an error, add it to @@ -2924,7 +2924,8 @@ class Safekeeper: return res def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient: - return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token) + is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper") + return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled) def data_dir(self) -> str: return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") @@ -2975,10 +2976,11 @@ class SafekeeperMetrics: class SafekeeperHttpClient(requests.Session): HTTPError = requests.HTTPError - def __init__(self, port: int, auth_token: Optional[str] = None): + def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled = False): super().__init__() self.port = port self.auth_token = auth_token + self.is_testing_enabled = is_testing_enabled if auth_token is not None: self.headers["Authorization"] = f"Bearer {auth_token}" @@ -2986,6 +2988,30 @@ class SafekeeperHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + def is_testing_enabled_or_skip(self): + if not self.is_testing_enabled: + pytest.skip("safekeeper was built without 'testing' feature") + + def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]): + self.is_testing_enabled_or_skip() + + if isinstance(config_strings, tuple): + pairs = [config_strings] + else: + pairs = config_strings + + log.info(f"Requesting config failpoints: {repr(pairs)}") + + res = self.put( + f"http://localhost:{self.port}/v1/failpoints", + json=[{"name": name, "actions": actions} for name, actions in pairs], + ) + log.info(f"Got failpoints request response code {res.status_code}") + res.raise_for_status() + res_json = res.json() + assert res_json is None + return res_json + def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]: params = params or {} res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params) From 4a227484bff969ddade5b42ee846c6056870ad1a Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 29 Dec 2023 23:09:36 +0300 Subject: [PATCH 5/6] Add large insertion and slow WAL sending to test_hot_standby. To exercise MAX_SEND_SIZE sending from safekeeper; we've had a bug with WAL records torn across several XLogData messages. Add failpoint to safekeeper to slow down sending. Also check for corrupted WAL complains in standby log. Make the test a bit simpler in passing, e.g. we don't need explicit commits as autocommit is enabled by default. https://neondb.slack.com/archives/C05L7D1JAUS/p1703774799114719 https://github.com/neondatabase/cloud/issues/9057 --- safekeeper/src/send_wal.rs | 6 ++ test_runner/fixtures/neon_fixtures.py | 17 +++-- test_runner/regress/test_hot_standby.py | 91 +++++++++++++++---------- 3 files changed, 73 insertions(+), 41 deletions(-) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 70590a0f95..bd1d306968 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -17,6 +17,7 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; +use utils::failpoint_support; use utils::id::TenantTimelineId; use utils::lsn::AtomicLsn; use utils::pageserver_feedback::PageserverFeedback; @@ -559,6 +560,11 @@ impl WalSender<'_, IO> { })) .await?; + if let Some(appname) = &self.appname { + if appname == "replica" { + failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep"); + } + } trace!( "sent {} bytes of WAL {}-{}", send_size, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9aa82d8854..5b1a8ba27d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -347,7 +347,9 @@ class PgProtocol: """ return self.safe_psql_many([query], **kwargs)[0] - def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]: + def safe_psql_many( + self, queries: List[str], log_query=True, **kwargs: Any + ) -> List[List[Tuple[Any, ...]]]: """ Execute queries against the node and return all rows. This method passes all extra params to connstr. @@ -356,7 +358,8 @@ class PgProtocol: with closing(self.connect(**kwargs)) as conn: with conn.cursor() as cur: for query in queries: - log.info(f"Executing query: {query}") + if log_query: + log.info(f"Executing query: {query}") cur.execute(query) if cur.description is None: @@ -365,11 +368,11 @@ class PgProtocol: result.append(cur.fetchall()) return result - def safe_psql_scalar(self, query) -> Any: + def safe_psql_scalar(self, query, log_query=True) -> Any: """ Execute query returning single row with single column. """ - return self.safe_psql(query)[0][0] + return self.safe_psql(query, log_query=log_query)[0][0] @dataclass @@ -2925,7 +2928,9 @@ class Safekeeper: def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient: is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper") - return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled) + return SafekeeperHttpClient( + port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled + ) def data_dir(self) -> str: return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") @@ -2976,7 +2981,7 @@ class SafekeeperMetrics: class SafekeeperHttpClient(requests.Session): HTTPError = requests.HTTPError - def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled = False): + def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False): super().__init__() self.port = port self.auth_token = auth_token diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 031fd2857d..7822e29ed9 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -1,19 +1,59 @@ +import os +import re import time -from fixtures.neon_fixtures import NeonEnv +from fixtures.log_helper import log +from fixtures.neon_fixtures import Endpoint, NeonEnv + + +def wait_caughtup(primary: Endpoint, secondary: Endpoint): + primary_lsn = primary.safe_psql_scalar( + "SELECT pg_current_wal_insert_lsn()::text", log_query=False + ) + while True: + secondary_lsn = secondary.safe_psql_scalar( + "SELECT pg_last_wal_replay_lsn()", log_query=False + ) + caught_up = secondary_lsn >= primary_lsn + log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}") + if caught_up: + return + time.sleep(1) + + +# Check for corrupted WAL messages which might otherwise go unnoticed if +# reconnection fixes this. +def scan_standby_log_for_errors(secondary): + log_path = secondary.endpoint_path() / "compute.log" + with log_path.open("r") as f: + markers = re.compile( + r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr" + ) + for line in f: + if markers.search(line): + log.info(f"bad error in standby log: {line}") + raise AssertionError() def test_hot_standby(neon_simple_env: NeonEnv): env = neon_simple_env + # We've had a bug caused by WAL records split across multiple XLogData + # messages resulting in corrupted WAL complains on standby. It reproduced + # only when sending from safekeeper is slow enough to grab full + # MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in + # one conf to decrease test time. + slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "") + if slow_down_send: + sk_http = env.safekeepers[0].http_client() + sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")]) + with env.endpoints.create_start( branch_name="main", endpoint_id="primary", ) as primary: time.sleep(1) with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: - primary_lsn = None - caught_up = False queries = [ "SHOW neon.timeline_id", "SHOW neon.tenant_id", @@ -26,23 +66,6 @@ def test_hot_standby(neon_simple_env: NeonEnv): with p_con.cursor() as p_cur: p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i") - # Explicit commit to make sure other connections (and replicas) can - # see the changes of this commit. - p_con.commit() - - with p_con.cursor() as p_cur: - p_cur.execute("SELECT pg_current_wal_insert_lsn()::text") - res = p_cur.fetchone() - assert res is not None - (lsn,) = res - primary_lsn = lsn - - # Explicit commit to make sure other connections (and replicas) can - # see the changes of this commit. - # Note that this may generate more WAL if the transaction has changed - # things, but we don't care about that. - p_con.commit() - for query in queries: with p_con.cursor() as p_cur: p_cur.execute(query) @@ -51,30 +74,28 @@ def test_hot_standby(neon_simple_env: NeonEnv): response = res responses[query] = response + # insert more data to make safekeeper send MAX_SEND_SIZE messages + if slow_down_send: + primary.safe_psql("create table t(key int, value text)") + primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'") + + wait_caughtup(primary, secondary) + with secondary.connect() as s_con: with s_con.cursor() as s_cur: s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()") res = s_cur.fetchone() assert res is not None - while not caught_up: - with s_con.cursor() as secondary_cursor: - secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()") - res = secondary_cursor.fetchone() - assert res is not None - (secondary_lsn,) = res - # There may be more changes on the primary after we got our LSN - # due to e.g. autovacuum, but that shouldn't impact the content - # of the tables, so we check whether we've replayed up to at - # least after the commit of the `test` table. - caught_up = secondary_lsn >= primary_lsn - - # Explicit commit to flush any transient transaction-level state. - s_con.commit() - for query in queries: with s_con.cursor() as secondary_cursor: secondary_cursor.execute(query) response = secondary_cursor.fetchone() assert response is not None assert response == responses[query] + + scan_standby_log_for_errors(secondary) + + # clean up + if slow_down_send: + sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off")) From 54aa31980597742f28b022bd0a729b97d910f0b7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Sat, 30 Dec 2023 00:31:19 +0300 Subject: [PATCH 6/6] Don't split WAL record across two XLogData's when sending from safekeepers. As protocol demands. Not following this makes standby complain about corrupted WAL in various ways. https://neondb.slack.com/archives/C05L7D1JAUS/p1703774799114719 closes https://github.com/neondatabase/cloud/issues/9057 --- safekeeper/src/send_wal.rs | 22 +++++++++++++++------- safekeeper/src/wal_storage.rs | 3 +++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index bd1d306968..9a5657a40d 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -529,12 +529,19 @@ impl WalSender<'_, IO> { ); // try to send as much as available, capped by MAX_SEND_SIZE - let mut send_size = self - .end_pos - .checked_sub(self.start_pos) - .context("reading wal without waiting for it first")? - .0 as usize; - send_size = min(send_size, self.send_buf.len()); + let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64; + // if we went behind available WAL, back off + if chunk_end_pos >= self.end_pos { + chunk_end_pos = self.end_pos; + } else { + // If sending not up to end pos, round down to page boundary to + // avoid breaking WAL record not at page boundary, as protocol + // demands. See walsender.c (XLogSendPhysical). + chunk_end_pos = chunk_end_pos + .checked_sub(chunk_end_pos.block_offset()) + .unwrap(); + } + let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize; let send_buf = &mut self.send_buf[..send_size]; let send_size: usize; { @@ -545,7 +552,8 @@ impl WalSender<'_, IO> { } else { None }; - // read wal into buffer + // Read WAL into buffer. send_size can be additionally capped to + // segment boundary here. send_size = self.wal_reader.read(send_buf).await? }; let send_buf = &send_buf[..send_size]; diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index fa44b24258..e7538f805c 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -565,6 +565,9 @@ impl WalReader { }) } + /// Read WAL at current position into provided buf, returns number of bytes + /// read. It can be smaller than buf size only if segment boundary is + /// reached. pub async fn read(&mut self, buf: &mut [u8]) -> Result { // If this timeline is new, we may not have a full segment yet, so // we pad the first bytes of the timeline's first WAL segment with 0s