Merge pull request #6242 from neondatabase/releases/2024-01-02

Release 2024-01-02
This commit is contained in:
Arseny Sher
2024-01-02 12:04:43 +04:00
committed by GitHub
22 changed files with 325 additions and 123 deletions

4
Cargo.lock generated
View File

@@ -1161,6 +1161,7 @@ dependencies = [
"flate2",
"futures",
"hyper",
"nix 0.26.2",
"notify",
"num_cpus",
"opentelemetry",
@@ -1171,6 +1172,7 @@ dependencies = [
"rust-ini",
"serde",
"serde_json",
"signal-hook",
"tar",
"tokio",
"tokio-postgres",
@@ -4447,6 +4449,7 @@ dependencies = [
"clap",
"const_format",
"crc32c",
"fail",
"fs2",
"futures",
"git-version",
@@ -5878,6 +5881,7 @@ dependencies = [
"chrono",
"const_format",
"criterion",
"fail",
"futures",
"heapless",
"hex",

View File

@@ -13,6 +13,7 @@ clap.workspace = true
flate2.workspace = true
futures.workspace = true
hyper = { workspace = true, features = ["full"] }
nix.workspace = true
notify.workspace = true
num_cpus.workspace = true
opentelemetry.workspace = true
@@ -20,6 +21,7 @@ postgres.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
reqwest = { workspace = true, features = ["json"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }

View File

@@ -40,18 +40,22 @@ use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::process::exit;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use nix::sys::signal::{kill, Signal};
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
@@ -67,6 +71,13 @@ const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
for sig in signals.forever() {
handle_exit_signal(sig);
}
});
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
@@ -346,6 +357,7 @@ fn main() -> Result<()> {
let ecode = pg
.wait()
.expect("failed to start waiting on Postgres process");
PG_PID.store(0, Ordering::SeqCst);
info!("Postgres exited with code {}, shutting down", ecode);
exit_code = ecode.code()
}
@@ -519,6 +531,24 @@ fn cli() -> clap::Command {
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
/// to prevent leakage. TODO: it is better to convert compute_ctl to async and
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32) {
info!("received {sig} termination signal");
let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
if ss_pid != 0 {
let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
kill(ss_pid, Signal::SIGTERM).ok();
}
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
kill(pg_pid, Signal::SIGTERM).ok();
}
exit(1);
}
#[test]
fn verify_cli() {
cli().debug_assert()

View File

@@ -6,6 +6,8 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::{Condvar, Mutex, RwLock};
use std::thread;
use std::time::Instant;
@@ -34,6 +36,9 @@ use crate::spec::*;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
// Url type maintains proper escaping
@@ -501,6 +506,7 @@ impl ComputeNode {
.stdout(Stdio::piped())
.spawn()
.expect("postgres --sync-safekeepers failed to start");
SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
// `postgres --sync-safekeepers` will print all log output to stderr and
// final LSN to stdout. So we pipe only stdout, while stderr will be automatically
@@ -508,6 +514,7 @@ impl ComputeNode {
let sync_output = sync_handle
.wait_with_output()
.expect("postgres --sync-safekeepers failed");
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
if !sync_output.status.success() {
anyhow::bail!(
@@ -662,6 +669,7 @@ impl ComputeNode {
})
.spawn()
.expect("cannot start postgres process");
PG_PID.store(pg.id(), Ordering::SeqCst);
wait_for_postgres(&mut pg, pgdata_path)?;

View File

@@ -46,6 +46,8 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::spec::RemoteExtSpec;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
@@ -439,11 +441,14 @@ impl Endpoint {
Ok(())
}
fn wait_for_compute_ctl_to_exit(&self) -> Result<()> {
fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
// TODO use background_process::stop_process instead
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
let pid = nix::unistd::Pid::from_raw(pid as i32);
if send_sigterm {
kill(pid, Signal::SIGTERM).ok();
}
crate::background_process::wait_until_stopped("compute_ctl", pid)?;
Ok(())
}
@@ -733,10 +738,15 @@ impl Endpoint {
&None,
)?;
// Also wait for the compute_ctl process to die. It might have some cleanup
// work to do after postgres stops, like syncing safekeepers, etc.
// Also wait for the compute_ctl process to die. It might have some
// cleanup work to do after postgres stops, like syncing safekeepers,
// etc.
//
self.wait_for_compute_ctl_to_exit()?;
// If destroying, send it SIGTERM before waiting. Sometimes we do *not*
// want this cleanup: tests intentionally do stop when majority of
// safekeepers is down, so sync-safekeepers would hang otherwise. This
// could be a separate flag though.
self.wait_for_compute_ctl_to_exit(destroy)?;
if destroy {
println!(
"Destroying postgres data directory '{}'",

View File

@@ -557,19 +557,6 @@ pub enum DownloadRemoteLayersTaskState {
ShutDown,
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,

View File

@@ -4,6 +4,12 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
[dependencies]
arc-swap.workspace = true
sentry.workspace = true
@@ -16,6 +22,7 @@ chrono.workspace = true
heapless.workspace = true
hex = { workspace = true, features = ["serde"] }
hyper = { workspace = true, features = ["full"] }
fail.workspace = true
futures = { workspace = true}
jsonwebtoken.workspace = true
nix.workspace = true

View File

@@ -1,3 +1,14 @@
//! Failpoint support code shared between pageserver and safekeepers.
use crate::http::{
error::ApiError,
json::{json_request, json_response},
};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
@@ -25,7 +36,7 @@ pub use __failpoint_sleep_millis_async as sleep_millis_async;
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
#[doc(hidden)]
pub(crate) async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
@@ -71,7 +82,7 @@ pub fn init() -> fail::FailScenario<'static> {
scenario
}
pub(crate) fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
if actions == "exit" {
fail::cfg_callback(name, exit_failpoint)
} else {
@@ -84,3 +95,45 @@ fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
/// Configure failpoints through http.
pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because storage was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}

View File

@@ -83,6 +83,8 @@ pub mod timeout;
pub mod sync;
pub mod failpoint_support;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -31,6 +31,7 @@ use pageserver::{
virtual_file,
};
use postgres_backend::AuthType;
use utils::failpoint_support;
use utils::logging::TracingErrorLayerEnablement;
use utils::signals::ShutdownSignals;
use utils::{
@@ -126,7 +127,7 @@ fn main() -> anyhow::Result<()> {
}
// Initialize up failpoints support
let scenario = pageserver::failpoint_support::init();
let scenario = failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);

View File

@@ -25,6 +25,7 @@ use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -66,9 +67,6 @@ use utils::{
lsn::Lsn,
};
// Imports only used for testing APIs
use pageserver_api::models::ConfigureFailpointsRequest;
// For APIs that require an Active tenant, how long should we block waiting for that state?
// This is not functionally necessary (clients will retry), but avoids generating a lot of
// failed API calls while tenants are activating.
@@ -1293,34 +1291,6 @@ async fn handle_tenant_break(
json_response(StatusCode::OK, ())
}
async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow!(
"Cannot manage failpoints because pageserver was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}
// Run GC immediately on given timeline.
async fn timeline_gc_handler(
mut request: Request<Body>,

View File

@@ -25,8 +25,6 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod failpoint_support;
use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;

View File

@@ -33,6 +33,7 @@ use tracing::*;
use utils::backoff;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
use utils::fs_ext;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
@@ -890,7 +891,7 @@ impl Tenant {
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
crate::failpoint_support::sleep_millis_async!("before-attaching-tenant");
failpoint_support::sleep_millis_async!("before-attaching-tenant");
let preload = match preload {
Some(p) => p,
@@ -1002,7 +1003,7 @@ impl Tenant {
// IndexPart is the source of truth.
self.clean_up_timelines(&existent_timelines)?;
crate::failpoint_support::sleep_millis_async!("attach-before-activate");
failpoint_support::sleep_millis_async!("attach-before-activate");
info!("Done");
@@ -2839,9 +2840,7 @@ impl Tenant {
}
};
crate::failpoint_support::sleep_millis_async!(
"gc_iteration_internal_after_getting_gc_timelines"
);
failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
// If there is nothing to GC, we don't want any messages in the INFO log.
if !gc_timelines.is_empty() {

View File

@@ -29,6 +29,7 @@ use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{bail, Context, Result};
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use utils::failpoint_support;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
@@ -344,9 +345,7 @@ impl<'a> WalIngest<'a> {
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
crate::failpoint_support::sleep_millis_async!(
"wal-ingest-logical-message-sleep"
);
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
}

View File

@@ -4,6 +4,12 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
[dependencies]
async-stream.workspace = true
anyhow.workspace = true
@@ -16,6 +22,7 @@ chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
crc32c.workspace = true
fail.workspace = true
fs2.workspace = true
git-version.workspace = true
hex.workspace = true

View File

@@ -54,6 +54,19 @@ const ID_FILE_NAME: &str = "safekeeper.id";
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
const FEATURES: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
];
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
fail::has_failpoints(),
FEATURES,
)
}
const ABOUT: &str = r#"
A fleet of safekeepers is responsible for reliably storing WAL received from
compute, passing it through consensus (mitigating potential computes brain
@@ -167,7 +180,9 @@ async fn main() -> anyhow::Result<()> {
// getting 'argument cannot be used multiple times' error. This seems to be
// impossible with pure Derive API, so convert struct to Command, modify it,
// parse arguments, and then fill the struct back.
let cmd = <Args as clap::CommandFactory>::command().args_override_self(true);
let cmd = <Args as clap::CommandFactory>::command()
.args_override_self(true)
.version(version());
let mut matches = cmd.get_matches();
let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;

View File

@@ -12,6 +12,8 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::failpoints_handler;
use std::io::Write as _;
use tokio::sync::mpsc;
@@ -444,6 +446,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", |r| request_span(r, status_handler))
.put("/v1/failpoints", |r| {
request_span(r, move |r| async {
let cancel = CancellationToken::new();
failpoints_handler(r, cancel).await
})
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)

View File

@@ -17,6 +17,7 @@ use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
use utils::lsn::AtomicLsn;
use utils::pageserver_feedback::PageserverFeedback;
@@ -391,15 +392,8 @@ impl SafekeeperPostgresHandler {
// application_name: give only committed WAL (used by pageserver) or all
// existing WAL (up to flush_lsn, used by walproposer or peer recovery).
// The second case is always driven by a consensus leader which term
// must generally be also supplied. However we're sloppy to do this in
// walproposer recovery which will be removed soon. So TODO is to make
// it not Option'al then.
//
// Fetching WAL without term in recovery creates a small risk of this
// WAL getting concurrently garbaged if another compute rises which
// collects majority and starts fixing log on this safekeeper itself.
// That's ok as (old) proposer will never be able to commit such WAL.
let end_watch = if self.is_walproposer_recovery() {
// must be supplied.
let end_watch = if term.is_some() {
EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
} else {
EndWatch::Commit(tli.get_commit_lsn_watch_rx())
@@ -535,12 +529,19 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut send_size = self
.end_pos
.checked_sub(self.start_pos)
.context("reading wal without waiting for it first")?
.0 as usize;
send_size = min(send_size, self.send_buf.len());
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= self.end_pos {
chunk_end_pos = self.end_pos;
} else {
// If sending not up to end pos, round down to page boundary to
// avoid breaking WAL record not at page boundary, as protocol
// demands. See walsender.c (XLogSendPhysical).
chunk_end_pos = chunk_end_pos
.checked_sub(chunk_end_pos.block_offset())
.unwrap();
}
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
@@ -551,7 +552,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
} else {
None
};
// read wal into buffer
// Read WAL into buffer. send_size can be additionally capped to
// segment boundary here.
send_size = self.wal_reader.read(send_buf).await?
};
let send_buf = &send_buf[..send_size];
@@ -566,6 +568,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
}))
.await?;
if let Some(appname) = &self.appname {
if appname == "replica" {
failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
}
}
trace!(
"sent {} bytes of WAL {}-{}",
send_size,

View File

@@ -565,6 +565,9 @@ impl WalReader {
})
}
/// Read WAL at current position into provided buf, returns number of bytes
/// read. It can be smaller than buf size only if segment boundary is
/// reached.
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// If this timeline is new, we may not have a full segment yet, so
// we pad the first bytes of the timeline's first WAL segment with 0s

View File

@@ -347,7 +347,9 @@ class PgProtocol:
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
def safe_psql_many(
self, queries: List[str], log_query=True, **kwargs: Any
) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
@@ -356,7 +358,8 @@ class PgProtocol:
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
log.info(f"Executing query: {query}")
if log_query:
log.info(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
@@ -365,11 +368,11 @@ class PgProtocol:
result.append(cur.fetchall())
return result
def safe_psql_scalar(self, query) -> Any:
def safe_psql_scalar(self, query, log_query=True) -> Any:
"""
Execute query returning single row with single column.
"""
return self.safe_psql(query)[0][0]
return self.safe_psql(query, log_query=log_query)[0][0]
@dataclass
@@ -890,8 +893,8 @@ class NeonEnv:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
def get_binary_version(self, binary_name: str) -> str:
bin_pageserver = str(self.neon_binpath / binary_name)
res = subprocess.run(
[bin_pageserver, "--version"],
check=True,
@@ -1656,7 +1659,7 @@ class NeonPageserver(PgProtocol):
self.running = False
self.service_port = port
self.config_override = config_override
self.version = env.get_pageserver_version()
self.version = env.get_binary_version("pageserver")
# After a test finishes, we will scrape the log to see if there are any
# unexpected error messages. If your test expects an error, add it to
@@ -2924,7 +2927,10 @@ class Safekeeper:
return res
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token)
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
return SafekeeperHttpClient(
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
)
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
@@ -2975,10 +2981,11 @@ class SafekeeperMetrics:
class SafekeeperHttpClient(requests.Session):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None):
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled = is_testing_enabled
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@@ -2986,6 +2993,30 @@ class SafekeeperHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)

View File

@@ -1,19 +1,59 @@
import os
import re
import time
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = primary.safe_psql_scalar(
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
)
while True:
secondary_lsn = secondary.safe_psql_scalar(
"SELECT pg_last_wal_replay_lsn()", log_query=False
)
caught_up = secondary_lsn >= primary_lsn
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
if caught_up:
return
time.sleep(1)
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
def scan_standby_log_for_errors(secondary):
log_path = secondary.endpoint_path() / "compute.log"
with log_path.open("r") as f:
markers = re.compile(
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
)
for line in f:
if markers.search(line):
log.info(f"bad error in standby log: {line}")
raise AssertionError()
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
# We've had a bug caused by WAL records split across multiple XLogData
# messages resulting in corrupted WAL complains on standby. It reproduced
# only when sending from safekeeper is slow enough to grab full
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
# one conf to decrease test time.
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
if slow_down_send:
sk_http = env.safekeepers[0].http_client()
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
caught_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
@@ -26,23 +66,6 @@ def test_hot_standby(neon_simple_env: NeonEnv):
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
p_con.commit()
with p_con.cursor() as p_cur:
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
res = p_cur.fetchone()
assert res is not None
(lsn,) = res
primary_lsn = lsn
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
# Note that this may generate more WAL if the transaction has changed
# things, but we don't care about that.
p_con.commit()
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
@@ -51,30 +74,28 @@ def test_hot_standby(neon_simple_env: NeonEnv):
response = res
responses[query] = response
# insert more data to make safekeeper send MAX_SEND_SIZE messages
if slow_down_send:
primary.safe_psql("create table t(key int, value text)")
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
wait_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
while not caught_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
assert res is not None
(secondary_lsn,) = res
# There may be more changes on the primary after we got our LSN
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
caught_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]
scan_standby_log_for_errors(secondary)
# clean up
if slow_down_send:
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))

View File

@@ -475,6 +475,46 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_unavailability(env, endpoint))
async def run_recovery_uncommitted(env: NeonEnv):
(sk1, sk2, _) = env.safekeepers
env.neon_cli.create_branch("test_recovery_uncommitted")
ep = env.endpoints.create_start("test_recovery_uncommitted")
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
# insert with only one safekeeper up to create tail of flushed but not committed WAL
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
# Start one of sks to make quorum online plus compute and ensure they can
# sync.
sk2.start()
ep = env.endpoints.create_start(
"test_recovery_uncommitted",
)
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
# Test pulling uncommitted WAL (up to flush_lsn) during recovery.
def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_recovery_uncommitted(env))
@dataclass
class RaceConditionTest:
iteration: int