Add failpoint support to safekeeper.

Just a copy paste from pageserver.
This commit is contained in:
Arseny Sher
2024-01-01 23:32:24 +03:00
committed by Arseny Sher
parent dbd36e40dc
commit e79a19339c
5 changed files with 63 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -4449,6 +4449,7 @@ dependencies = [
"clap",
"const_format",
"crc32c",
"fail",
"fs2",
"futures",
"git-version",

View File

@@ -4,6 +4,12 @@ version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
[dependencies]
async-stream.workspace = true
anyhow.workspace = true
@@ -16,6 +22,7 @@ chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
crc32c.workspace = true
fail.workspace = true
fs2.workspace = true
git-version.workspace = true
hex.workspace = true

View File

@@ -54,6 +54,19 @@ const ID_FILE_NAME: &str = "safekeeper.id";
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
const FEATURES: &[&str] = &[
#[cfg(feature = "testing")]
"testing",
];
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
fail::has_failpoints(),
FEATURES,
)
}
const ABOUT: &str = r#"
A fleet of safekeepers is responsible for reliably storing WAL received from
compute, passing it through consensus (mitigating potential computes brain
@@ -167,7 +180,9 @@ async fn main() -> anyhow::Result<()> {
// getting 'argument cannot be used multiple times' error. This seems to be
// impossible with pure Derive API, so convert struct to Command, modify it,
// parse arguments, and then fill the struct back.
let cmd = <Args as clap::CommandFactory>::command().args_override_self(true);
let cmd = <Args as clap::CommandFactory>::command()
.args_override_self(true)
.version(version());
let mut matches = cmd.get_matches();
let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;

View File

@@ -12,6 +12,8 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::failpoints_handler;
use std::io::Write as _;
use tokio::sync::mpsc;
@@ -444,6 +446,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", |r| request_span(r, status_handler))
.put("/v1/failpoints", |r| {
request_span(r, move |r| async {
let cancel = CancellationToken::new();
failpoints_handler(r, cancel).await
})
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)

View File

@@ -890,8 +890,8 @@ class NeonEnv:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
def get_binary_version(self, binary_name: str) -> str:
bin_pageserver = str(self.neon_binpath / binary_name)
res = subprocess.run(
[bin_pageserver, "--version"],
check=True,
@@ -1656,7 +1656,7 @@ class NeonPageserver(PgProtocol):
self.running = False
self.service_port = port
self.config_override = config_override
self.version = env.get_pageserver_version()
self.version = env.get_binary_version("pageserver")
# After a test finishes, we will scrape the log to see if there are any
# unexpected error messages. If your test expects an error, add it to
@@ -2924,7 +2924,8 @@ class Safekeeper:
return res
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token)
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled)
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
@@ -2975,10 +2976,11 @@ class SafekeeperMetrics:
class SafekeeperHttpClient(requests.Session):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None):
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled = False):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled = is_testing_enabled
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@@ -2986,6 +2988,30 @@ class SafekeeperHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)