mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Add simple test of pageserver recovery after crash. To cause a crash, use failpoints in checkpointer
This commit is contained in:
@@ -121,7 +121,7 @@ jobs:
|
||||
export RUSTC_WRAPPER=cachepot
|
||||
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
|
||||
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --bins --tests
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
cachepot -s
|
||||
|
||||
- save_cache:
|
||||
|
||||
@@ -8,6 +8,7 @@ use anyhow::{bail, Context, Result};
|
||||
use clap::{App, Arg};
|
||||
use daemonize::Daemonize;
|
||||
|
||||
use fail::FailScenario;
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
|
||||
@@ -84,8 +85,23 @@ fn main() -> anyhow::Result<()> {
|
||||
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
|
||||
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("enabled-features")
|
||||
.long("enabled-features")
|
||||
.takes_value(false)
|
||||
.help("Show enabled compile time features"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
if arg_matches.is_present("enabled-features") {
|
||||
let features: &[&str] = &[
|
||||
#[cfg(feature = "failpoints")]
|
||||
"failpoints",
|
||||
];
|
||||
println!("{{\"features\": {features:?} }}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith"));
|
||||
let workdir = workdir
|
||||
.canonicalize()
|
||||
@@ -166,6 +182,14 @@ fn main() -> anyhow::Result<()> {
|
||||
// as a ref.
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
// If failpoints are used, terminate the whole pageserver process if they are hit.
|
||||
let scenario = FailScenario::setup();
|
||||
if fail::has_failpoints() {
|
||||
std::panic::set_hook(Box::new(|_| {
|
||||
std::process::exit(1);
|
||||
}));
|
||||
}
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
@@ -181,10 +205,12 @@ fn main() -> anyhow::Result<()> {
|
||||
cfg_file_path.display()
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
} else {
|
||||
start_pageserver(conf, daemonize).context("Failed to start pageserver")
|
||||
start_pageserver(conf, daemonize).context("Failed to start pageserver")?;
|
||||
}
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> {
|
||||
|
||||
@@ -1703,6 +1703,7 @@ impl LayeredTimeline {
|
||||
new_delta_path.clone(),
|
||||
self.conf.timeline_path(&self.timelineid, &self.tenantid),
|
||||
])?;
|
||||
fail_point!("checkpoint-before-sync");
|
||||
|
||||
fail_point!("flush-frozen");
|
||||
|
||||
@@ -1727,6 +1728,7 @@ impl LayeredTimeline {
|
||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||
let disk_consistent_lsn = Lsn(frozen_layer.get_lsn_range().end.0 - 1);
|
||||
fail_point!("checkpoint-after-sync");
|
||||
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
|
||||
64
test_runner/batch_others/test_recovery.py
Normal file
64
test_runner/batch_others/test_recovery.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import os
|
||||
import time
|
||||
import psycopg2.extras
|
||||
import json
|
||||
from ast import Assert
|
||||
from contextlib import closing
|
||||
from fixtures.zenith_fixtures import ZenithEnvBuilder
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
#
|
||||
# Test pageserver recovery after crash
|
||||
#
|
||||
def test_pageserver_recovery(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
# Override default checkpointer settings to run it more often
|
||||
zenith_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance = 1048576}"
|
||||
|
||||
env = zenith_env_builder.init()
|
||||
|
||||
# Check if failpoints enables. Otherwise the test doesn't make sense
|
||||
f = env.zenith_cli.pageserver_enabled_features()
|
||||
|
||||
assert "failpoints" in f["features"], "Build pageserver with --features=failpoints option to run this test"
|
||||
zenith_env_builder.start()
|
||||
|
||||
# Create a branch for us
|
||||
env.zenith_cli.create_branch("test_pageserver_recovery", "main")
|
||||
|
||||
pg = env.postgres.create_start('test_pageserver_recovery')
|
||||
log.info("postgres is running on 'test_pageserver_recovery' branch")
|
||||
|
||||
connstr = pg.connstr()
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
|
||||
# Create and initialize test table
|
||||
cur.execute("CREATE TABLE foo(x bigint)")
|
||||
cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))")
|
||||
|
||||
# Sleep for some time to let checkpoint create image layers
|
||||
time.sleep(2)
|
||||
|
||||
# Configure failpoints
|
||||
pscur.execute(
|
||||
"failpoints checkpoint-before-sync=sleep(2000);checkpoint-after-sync=panic")
|
||||
|
||||
# Do some updates until pageserver is crashed
|
||||
try:
|
||||
while True:
|
||||
cur.execute("update foo set x=x+1")
|
||||
except Exception as err:
|
||||
log.info(f"Excepted server crash {err}")
|
||||
|
||||
log.info("Wait before server restart")
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select count(*) from foo")
|
||||
assert cur.fetchone() == (100000, )
|
||||
@@ -980,6 +980,19 @@ class ZenithCli:
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def pageserver_enabled_features(self) -> Any:
|
||||
bin_pageserver = os.path.join(str(zenith_binpath), 'pageserver')
|
||||
args = [bin_pageserver, '--enabled-features']
|
||||
log.info('Running command "{}"'.format(' '.join(args)))
|
||||
|
||||
res = subprocess.run(args,
|
||||
check=True,
|
||||
universal_newlines=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
log.info(f"pageserver_enabled_features success: {res.stdout}")
|
||||
return json.loads(res.stdout)
|
||||
|
||||
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
|
||||
start_args = ['pageserver', 'start', *overrides]
|
||||
append_pageserver_param_overrides(start_args,
|
||||
|
||||
Reference in New Issue
Block a user