mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
Merge pull request #6242 from neondatabase/releases/2024-01-02
Release 2024-01-02
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1161,6 +1161,7 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"hyper",
|
||||
"nix 0.26.2",
|
||||
"notify",
|
||||
"num_cpus",
|
||||
"opentelemetry",
|
||||
@@ -1171,6 +1172,7 @@ dependencies = [
|
||||
"rust-ini",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"signal-hook",
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
@@ -4447,6 +4449,7 @@ dependencies = [
|
||||
"clap",
|
||||
"const_format",
|
||||
"crc32c",
|
||||
"fail",
|
||||
"fs2",
|
||||
"futures",
|
||||
"git-version",
|
||||
@@ -5878,6 +5881,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"const_format",
|
||||
"criterion",
|
||||
"fail",
|
||||
"futures",
|
||||
"heapless",
|
||||
"hex",
|
||||
|
||||
@@ -13,6 +13,7 @@ clap.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
num_cpus.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
@@ -20,6 +21,7 @@ postgres.workspace = true
|
||||
regex.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
tar.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
|
||||
@@ -40,18 +40,22 @@ use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::process::exit;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Arg;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use signal_hook::consts::{SIGQUIT, SIGTERM};
|
||||
use signal_hook::{consts::SIGINT, iterator::Signals};
|
||||
use tracing::{error, info};
|
||||
use url::Url;
|
||||
|
||||
use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::get_pg_version;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
@@ -67,6 +71,13 @@ const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
thread::spawn(move || {
|
||||
for sig in signals.forever() {
|
||||
handle_exit_signal(sig);
|
||||
}
|
||||
});
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG")
|
||||
.unwrap_or(BUILD_TAG_DEFAULT)
|
||||
.to_string();
|
||||
@@ -346,6 +357,7 @@ fn main() -> Result<()> {
|
||||
let ecode = pg
|
||||
.wait()
|
||||
.expect("failed to start waiting on Postgres process");
|
||||
PG_PID.store(0, Ordering::SeqCst);
|
||||
info!("Postgres exited with code {}, shutting down", ecode);
|
||||
exit_code = ecode.code()
|
||||
}
|
||||
@@ -519,6 +531,24 @@ fn cli() -> clap::Command {
|
||||
)
|
||||
}
|
||||
|
||||
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
|
||||
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
|
||||
/// wait for termination which would be easy then.
|
||||
fn handle_exit_signal(sig: i32) {
|
||||
info!("received {sig} termination signal");
|
||||
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
|
||||
if ss_pid != 0 {
|
||||
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
|
||||
kill(ss_pid, Signal::SIGTERM).ok();
|
||||
}
|
||||
let pg_pid = PG_PID.load(Ordering::SeqCst);
|
||||
if pg_pid != 0 {
|
||||
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
|
||||
kill(pg_pid, Signal::SIGTERM).ok();
|
||||
}
|
||||
exit(1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_cli() {
|
||||
cli().debug_assert()
|
||||
|
||||
@@ -6,6 +6,8 @@ use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Condvar, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
@@ -34,6 +36,9 @@ use crate::spec::*;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
use crate::{config, extension_server};
|
||||
|
||||
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
|
||||
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
// Url type maintains proper escaping
|
||||
@@ -501,6 +506,7 @@ impl ComputeNode {
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()
|
||||
.expect("postgres --sync-safekeepers failed to start");
|
||||
SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
|
||||
|
||||
// `postgres --sync-safekeepers` will print all log output to stderr and
|
||||
// final LSN to stdout. So we pipe only stdout, while stderr will be automatically
|
||||
@@ -508,6 +514,7 @@ impl ComputeNode {
|
||||
let sync_output = sync_handle
|
||||
.wait_with_output()
|
||||
.expect("postgres --sync-safekeepers failed");
|
||||
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
|
||||
|
||||
if !sync_output.status.success() {
|
||||
anyhow::bail!(
|
||||
@@ -662,6 +669,7 @@ impl ComputeNode {
|
||||
})
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
PG_PID.store(pg.id(), Ordering::SeqCst);
|
||||
|
||||
wait_for_postgres(&mut pg, pgdata_path)?;
|
||||
|
||||
|
||||
@@ -46,6 +46,8 @@ use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
use nix::sys::signal::kill;
|
||||
use nix::sys::signal::Signal;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -439,11 +441,14 @@ impl Endpoint {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_for_compute_ctl_to_exit(&self) -> Result<()> {
|
||||
fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
|
||||
// TODO use background_process::stop_process instead
|
||||
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
|
||||
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
|
||||
let pid = nix::unistd::Pid::from_raw(pid as i32);
|
||||
if send_sigterm {
|
||||
kill(pid, Signal::SIGTERM).ok();
|
||||
}
|
||||
crate::background_process::wait_until_stopped("compute_ctl", pid)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -733,10 +738,15 @@ impl Endpoint {
|
||||
&None,
|
||||
)?;
|
||||
|
||||
// Also wait for the compute_ctl process to die. It might have some cleanup
|
||||
// work to do after postgres stops, like syncing safekeepers, etc.
|
||||
// Also wait for the compute_ctl process to die. It might have some
|
||||
// cleanup work to do after postgres stops, like syncing safekeepers,
|
||||
// etc.
|
||||
//
|
||||
self.wait_for_compute_ctl_to_exit()?;
|
||||
// If destroying, send it SIGTERM before waiting. Sometimes we do *not*
|
||||
// want this cleanup: tests intentionally do stop when majority of
|
||||
// safekeepers is down, so sync-safekeepers would hang otherwise. This
|
||||
// could be a separate flag though.
|
||||
self.wait_for_compute_ctl_to_exit(destroy)?;
|
||||
if destroy {
|
||||
println!(
|
||||
"Destroying postgres data directory '{}'",
|
||||
|
||||
@@ -557,19 +557,6 @@ pub enum DownloadRemoteLayersTaskState {
|
||||
ShutDown,
|
||||
}
|
||||
|
||||
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
|
||||
/// Information for configuring a single fail point
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FailpointConfig {
|
||||
/// Name of the fail point
|
||||
pub name: String,
|
||||
/// List of actions to take, using the format described in `fail::cfg`
|
||||
///
|
||||
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
|
||||
pub actions: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct TimelineGcRequest {
|
||||
pub gc_horizon: Option<u64>,
|
||||
|
||||
@@ -4,6 +4,12 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
arc-swap.workspace = true
|
||||
sentry.workspace = true
|
||||
@@ -16,6 +22,7 @@ chrono.workspace = true
|
||||
heapless.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
fail.workspace = true
|
||||
futures = { workspace = true}
|
||||
jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
|
||||
@@ -1,3 +1,14 @@
|
||||
//! Failpoint support code shared between pageserver and safekeepers.
|
||||
|
||||
use crate::http::{
|
||||
error::ApiError,
|
||||
json::{json_request, json_response},
|
||||
};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
///
|
||||
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
|
||||
@@ -25,7 +36,7 @@ pub use __failpoint_sleep_millis_async as sleep_millis_async;
|
||||
// Helper function used by the macro. (A function has nicer scoping so we
|
||||
// don't need to decorate everything with "::")
|
||||
#[doc(hidden)]
|
||||
pub(crate) async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
|
||||
pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
|
||||
let millis = duration_str.parse::<u64>().unwrap();
|
||||
let d = std::time::Duration::from_millis(millis);
|
||||
|
||||
@@ -71,7 +82,7 @@ pub fn init() -> fail::FailScenario<'static> {
|
||||
scenario
|
||||
}
|
||||
|
||||
pub(crate) fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
|
||||
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
|
||||
if actions == "exit" {
|
||||
fail::cfg_callback(name, exit_failpoint)
|
||||
} else {
|
||||
@@ -84,3 +95,45 @@ fn exit_failpoint() {
|
||||
tracing::info!("Exit requested by failpoint");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
|
||||
/// Information for configuring a single fail point
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FailpointConfig {
|
||||
/// Name of the fail point
|
||||
pub name: String,
|
||||
/// List of actions to take, using the format described in `fail::cfg`
|
||||
///
|
||||
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
|
||||
pub actions: String,
|
||||
}
|
||||
|
||||
/// Configure failpoints through http.
|
||||
pub async fn failpoints_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if !fail::has_failpoints() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot manage failpoints because storage was compiled without failpoints support"
|
||||
)));
|
||||
}
|
||||
|
||||
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
|
||||
for fp in failpoints {
|
||||
info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
|
||||
|
||||
if let Err(err_msg) = cfg_result {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Failed to configure failpoints: {err_msg}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -83,6 +83,8 @@ pub mod timeout;
|
||||
|
||||
pub mod sync;
|
||||
|
||||
pub mod failpoint_support;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
@@ -31,6 +31,7 @@ use pageserver::{
|
||||
virtual_file,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use utils::failpoint_support;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
use utils::signals::ShutdownSignals;
|
||||
use utils::{
|
||||
@@ -126,7 +127,7 @@ fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
// Initialize up failpoints support
|
||||
let scenario = pageserver::failpoint_support::init();
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
|
||||
@@ -25,6 +25,7 @@ use tenant_size_model::{SizeResult, StorageModel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::http::json::json_request_or_empty_body;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
@@ -66,9 +67,6 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
// Imports only used for testing APIs
|
||||
use pageserver_api::models::ConfigureFailpointsRequest;
|
||||
|
||||
// For APIs that require an Active tenant, how long should we block waiting for that state?
|
||||
// This is not functionally necessary (clients will retry), but avoids generating a lot of
|
||||
// failed API calls while tenants are activating.
|
||||
@@ -1293,34 +1291,6 @@ async fn handle_tenant_break(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn failpoints_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if !fail::has_failpoints() {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"Cannot manage failpoints because pageserver was compiled without failpoints support"
|
||||
)));
|
||||
}
|
||||
|
||||
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
|
||||
for fp in failpoints {
|
||||
info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
|
||||
|
||||
if let Err(err_msg) = cfg_result {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"Failed to configure failpoints: {err_msg}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
// Run GC immediately on given timeline.
|
||||
async fn timeline_gc_handler(
|
||||
mut request: Request<Body>,
|
||||
|
||||
@@ -25,8 +25,6 @@ pub mod walingest;
|
||||
pub mod walrecord;
|
||||
pub mod walredo;
|
||||
|
||||
pub mod failpoint_support;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use camino::Utf8Path;
|
||||
use deletion_queue::DeletionQueue;
|
||||
|
||||
@@ -33,6 +33,7 @@ use tracing::*;
|
||||
use utils::backoff;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::failpoint_support;
|
||||
use utils::fs_ext;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::gate::GateGuard;
|
||||
@@ -890,7 +891,7 @@ impl Tenant {
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!("before-attaching-tenant");
|
||||
failpoint_support::sleep_millis_async!("before-attaching-tenant");
|
||||
|
||||
let preload = match preload {
|
||||
Some(p) => p,
|
||||
@@ -1002,7 +1003,7 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!("attach-before-activate");
|
||||
failpoint_support::sleep_millis_async!("attach-before-activate");
|
||||
|
||||
info!("Done");
|
||||
|
||||
@@ -2839,9 +2840,7 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!(
|
||||
"gc_iteration_internal_after_getting_gc_timelines"
|
||||
);
|
||||
failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
|
||||
|
||||
// If there is nothing to GC, we don't want any messages in the INFO log.
|
||||
if !gc_timelines.is_empty() {
|
||||
|
||||
@@ -29,6 +29,7 @@ use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
@@ -344,9 +345,7 @@ impl<'a> WalIngest<'a> {
|
||||
// particular point in the WAL. For more fine-grained control,
|
||||
// we could peek into the message and only pause if it contains
|
||||
// a particular string, for example, but this is enough for now.
|
||||
crate::failpoint_support::sleep_millis_async!(
|
||||
"wal-ingest-logical-message-sleep"
|
||||
);
|
||||
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
|
||||
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
|
||||
modification.put_file(path, message, ctx).await?;
|
||||
}
|
||||
|
||||
@@ -4,6 +4,12 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
async-stream.workspace = true
|
||||
anyhow.workspace = true
|
||||
@@ -16,6 +22,7 @@ chrono.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
const_format.workspace = true
|
||||
crc32c.workspace = true
|
||||
fail.workspace = true
|
||||
fs2.workspace = true
|
||||
git-version.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -54,6 +54,19 @@ const ID_FILE_NAME: &str = "safekeeper.id";
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
|
||||
const FEATURES: &[&str] = &[
|
||||
#[cfg(feature = "testing")]
|
||||
"testing",
|
||||
];
|
||||
|
||||
fn version() -> String {
|
||||
format!(
|
||||
"{GIT_VERSION} failpoints: {}, features: {:?}",
|
||||
fail::has_failpoints(),
|
||||
FEATURES,
|
||||
)
|
||||
}
|
||||
|
||||
const ABOUT: &str = r#"
|
||||
A fleet of safekeepers is responsible for reliably storing WAL received from
|
||||
compute, passing it through consensus (mitigating potential computes brain
|
||||
@@ -167,7 +180,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
// getting 'argument cannot be used multiple times' error. This seems to be
|
||||
// impossible with pure Derive API, so convert struct to Command, modify it,
|
||||
// parse arguments, and then fill the struct back.
|
||||
let cmd = <Args as clap::CommandFactory>::command().args_override_self(true);
|
||||
let cmd = <Args as clap::CommandFactory>::command()
|
||||
.args_override_self(true)
|
||||
.version(version());
|
||||
let mut matches = cmd.get_matches();
|
||||
let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -444,6 +446,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
request_span(r, move |r| async {
|
||||
let cancel = CancellationToken::new();
|
||||
failpoints_handler(r, cancel).await
|
||||
})
|
||||
})
|
||||
// Will be used in the future instead of implicit timeline creation
|
||||
.post("/v1/tenant/timeline", |r| {
|
||||
request_span(r, timeline_create_handler)
|
||||
|
||||
@@ -17,6 +17,7 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
|
||||
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::AtomicLsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
@@ -391,15 +392,8 @@ impl SafekeeperPostgresHandler {
|
||||
// application_name: give only committed WAL (used by pageserver) or all
|
||||
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
|
||||
// The second case is always driven by a consensus leader which term
|
||||
// must generally be also supplied. However we're sloppy to do this in
|
||||
// walproposer recovery which will be removed soon. So TODO is to make
|
||||
// it not Option'al then.
|
||||
//
|
||||
// Fetching WAL without term in recovery creates a small risk of this
|
||||
// WAL getting concurrently garbaged if another compute rises which
|
||||
// collects majority and starts fixing log on this safekeeper itself.
|
||||
// That's ok as (old) proposer will never be able to commit such WAL.
|
||||
let end_watch = if self.is_walproposer_recovery() {
|
||||
// must be supplied.
|
||||
let end_watch = if term.is_some() {
|
||||
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
|
||||
} else {
|
||||
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
|
||||
@@ -535,12 +529,19 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut send_size = self
|
||||
.end_pos
|
||||
.checked_sub(self.start_pos)
|
||||
.context("reading wal without waiting for it first")?
|
||||
.0 as usize;
|
||||
send_size = min(send_size, self.send_buf.len());
|
||||
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= self.end_pos {
|
||||
chunk_end_pos = self.end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
@@ -551,7 +552,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// read wal into buffer
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = self.wal_reader.read(send_buf).await?
|
||||
};
|
||||
let send_buf = &send_buf[..send_size];
|
||||
@@ -566,6 +568,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
}))
|
||||
.await?;
|
||||
|
||||
if let Some(appname) = &self.appname {
|
||||
if appname == "replica" {
|
||||
failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
|
||||
}
|
||||
}
|
||||
trace!(
|
||||
"sent {} bytes of WAL {}-{}",
|
||||
send_size,
|
||||
|
||||
@@ -565,6 +565,9 @@ impl WalReader {
|
||||
})
|
||||
}
|
||||
|
||||
/// Read WAL at current position into provided buf, returns number of bytes
|
||||
/// read. It can be smaller than buf size only if segment boundary is
|
||||
/// reached.
|
||||
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
// If this timeline is new, we may not have a full segment yet, so
|
||||
// we pad the first bytes of the timeline's first WAL segment with 0s
|
||||
|
||||
@@ -347,7 +347,9 @@ class PgProtocol:
|
||||
"""
|
||||
return self.safe_psql_many([query], **kwargs)[0]
|
||||
|
||||
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
|
||||
def safe_psql_many(
|
||||
self, queries: List[str], log_query=True, **kwargs: Any
|
||||
) -> List[List[Tuple[Any, ...]]]:
|
||||
"""
|
||||
Execute queries against the node and return all rows.
|
||||
This method passes all extra params to connstr.
|
||||
@@ -356,7 +358,8 @@ class PgProtocol:
|
||||
with closing(self.connect(**kwargs)) as conn:
|
||||
with conn.cursor() as cur:
|
||||
for query in queries:
|
||||
log.info(f"Executing query: {query}")
|
||||
if log_query:
|
||||
log.info(f"Executing query: {query}")
|
||||
cur.execute(query)
|
||||
|
||||
if cur.description is None:
|
||||
@@ -365,11 +368,11 @@ class PgProtocol:
|
||||
result.append(cur.fetchall())
|
||||
return result
|
||||
|
||||
def safe_psql_scalar(self, query) -> Any:
|
||||
def safe_psql_scalar(self, query, log_query=True) -> Any:
|
||||
"""
|
||||
Execute query returning single row with single column.
|
||||
"""
|
||||
return self.safe_psql(query)[0][0]
|
||||
return self.safe_psql(query, log_query=log_query)[0][0]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -890,8 +893,8 @@ class NeonEnv:
|
||||
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
|
||||
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
|
||||
|
||||
def get_pageserver_version(self) -> str:
|
||||
bin_pageserver = str(self.neon_binpath / "pageserver")
|
||||
def get_binary_version(self, binary_name: str) -> str:
|
||||
bin_pageserver = str(self.neon_binpath / binary_name)
|
||||
res = subprocess.run(
|
||||
[bin_pageserver, "--version"],
|
||||
check=True,
|
||||
@@ -1656,7 +1659,7 @@ class NeonPageserver(PgProtocol):
|
||||
self.running = False
|
||||
self.service_port = port
|
||||
self.config_override = config_override
|
||||
self.version = env.get_pageserver_version()
|
||||
self.version = env.get_binary_version("pageserver")
|
||||
|
||||
# After a test finishes, we will scrape the log to see if there are any
|
||||
# unexpected error messages. If your test expects an error, add it to
|
||||
@@ -2924,7 +2927,10 @@ class Safekeeper:
|
||||
return res
|
||||
|
||||
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
|
||||
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token)
|
||||
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
|
||||
return SafekeeperHttpClient(
|
||||
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
|
||||
)
|
||||
|
||||
def data_dir(self) -> str:
|
||||
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
|
||||
@@ -2975,10 +2981,11 @@ class SafekeeperMetrics:
|
||||
class SafekeeperHttpClient(requests.Session):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
def __init__(self, port: int, auth_token: Optional[str] = None):
|
||||
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
|
||||
super().__init__()
|
||||
self.port = port
|
||||
self.auth_token = auth_token
|
||||
self.is_testing_enabled = is_testing_enabled
|
||||
|
||||
if auth_token is not None:
|
||||
self.headers["Authorization"] = f"Bearer {auth_token}"
|
||||
@@ -2986,6 +2993,30 @@ class SafekeeperHttpClient(requests.Session):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def is_testing_enabled_or_skip(self):
|
||||
if not self.is_testing_enabled:
|
||||
pytest.skip("safekeeper was built without 'testing' feature")
|
||||
|
||||
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
||||
params = params or {}
|
||||
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
|
||||
|
||||
@@ -1,19 +1,59 @@
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
primary_lsn = primary.safe_psql_scalar(
|
||||
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
|
||||
)
|
||||
while True:
|
||||
secondary_lsn = secondary.safe_psql_scalar(
|
||||
"SELECT pg_last_wal_replay_lsn()", log_query=False
|
||||
)
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
|
||||
if caught_up:
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
# reconnection fixes this.
|
||||
def scan_standby_log_for_errors(secondary):
|
||||
log_path = secondary.endpoint_path() / "compute.log"
|
||||
with log_path.open("r") as f:
|
||||
markers = re.compile(
|
||||
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
|
||||
)
|
||||
for line in f:
|
||||
if markers.search(line):
|
||||
log.info(f"bad error in standby log: {line}")
|
||||
raise AssertionError()
|
||||
|
||||
|
||||
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
# We've had a bug caused by WAL records split across multiple XLogData
|
||||
# messages resulting in corrupted WAL complains on standby. It reproduced
|
||||
# only when sending from safekeeper is slow enough to grab full
|
||||
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
|
||||
# one conf to decrease test time.
|
||||
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
|
||||
if slow_down_send:
|
||||
sk_http = env.safekeepers[0].http_client()
|
||||
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
primary_lsn = None
|
||||
caught_up = False
|
||||
queries = [
|
||||
"SHOW neon.timeline_id",
|
||||
"SHOW neon.tenant_id",
|
||||
@@ -26,23 +66,6 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
|
||||
|
||||
# Explicit commit to make sure other connections (and replicas) can
|
||||
# see the changes of this commit.
|
||||
p_con.commit()
|
||||
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
|
||||
res = p_cur.fetchone()
|
||||
assert res is not None
|
||||
(lsn,) = res
|
||||
primary_lsn = lsn
|
||||
|
||||
# Explicit commit to make sure other connections (and replicas) can
|
||||
# see the changes of this commit.
|
||||
# Note that this may generate more WAL if the transaction has changed
|
||||
# things, but we don't care about that.
|
||||
p_con.commit()
|
||||
|
||||
for query in queries:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute(query)
|
||||
@@ -51,30 +74,28 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
response = res
|
||||
responses[query] = response
|
||||
|
||||
# insert more data to make safekeeper send MAX_SEND_SIZE messages
|
||||
if slow_down_send:
|
||||
primary.safe_psql("create table t(key int, value text)")
|
||||
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
|
||||
|
||||
wait_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
while not caught_up:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
res = secondary_cursor.fetchone()
|
||||
assert res is not None
|
||||
(secondary_lsn,) = res
|
||||
# There may be more changes on the primary after we got our LSN
|
||||
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||
# of the tables, so we check whether we've replayed up to at
|
||||
# least after the commit of the `test` table.
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
|
||||
# Explicit commit to flush any transient transaction-level state.
|
||||
s_con.commit()
|
||||
|
||||
for query in queries:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute(query)
|
||||
response = secondary_cursor.fetchone()
|
||||
assert response is not None
|
||||
assert response == responses[query]
|
||||
|
||||
scan_standby_log_for_errors(secondary)
|
||||
|
||||
# clean up
|
||||
if slow_down_send:
|
||||
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
|
||||
|
||||
@@ -475,6 +475,46 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_unavailability(env, endpoint))
|
||||
|
||||
|
||||
async def run_recovery_uncommitted(env: NeonEnv):
|
||||
(sk1, sk2, _) = env.safekeepers
|
||||
|
||||
env.neon_cli.create_branch("test_recovery_uncommitted")
|
||||
ep = env.endpoints.create_start("test_recovery_uncommitted")
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
# insert with only one safekeeper up to create tail of flushed but not committed WAL
|
||||
sk1.stop()
|
||||
sk2.stop()
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
|
||||
ep.stop_and_destroy()
|
||||
|
||||
# Start one of sks to make quorum online plus compute and ensure they can
|
||||
# sync.
|
||||
sk2.start()
|
||||
ep = env.endpoints.create_start(
|
||||
"test_recovery_uncommitted",
|
||||
)
|
||||
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
|
||||
|
||||
|
||||
# Test pulling uncommitted WAL (up to flush_lsn) during recovery.
|
||||
def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
@dataclass
|
||||
class RaceConditionTest:
|
||||
iteration: int
|
||||
|
||||
Reference in New Issue
Block a user