diff --git a/Cargo.lock b/Cargo.lock index 73cb83d3a7..55e868a6d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4449,6 +4449,7 @@ dependencies = [ "clap", "const_format", "crc32c", + "fail", "fs2", "futures", "git-version", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index cccb4ebd79..4015c27933 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -4,6 +4,12 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +default = [] +# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, +# which adds some runtime cost to run tests on outage conditions +testing = ["fail/failpoints"] + [dependencies] async-stream.workspace = true anyhow.workspace = true @@ -16,6 +22,7 @@ chrono.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true crc32c.workspace = true +fail.workspace = true fs2.workspace = true git-version.workspace = true hex.workspace = true diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index e59deb9fda..33047051df 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -54,6 +54,19 @@ const ID_FILE_NAME: &str = "safekeeper.id"; project_git_version!(GIT_VERSION); project_build_tag!(BUILD_TAG); +const FEATURES: &[&str] = &[ + #[cfg(feature = "testing")] + "testing", +]; + +fn version() -> String { + format!( + "{GIT_VERSION} failpoints: {}, features: {:?}", + fail::has_failpoints(), + FEATURES, + ) +} + const ABOUT: &str = r#" A fleet of safekeepers is responsible for reliably storing WAL received from compute, passing it through consensus (mitigating potential computes brain @@ -167,7 +180,9 @@ async fn main() -> anyhow::Result<()> { // getting 'argument cannot be used multiple times' error. This seems to be // impossible with pure Derive API, so convert struct to Command, modify it, // parse arguments, and then fill the struct back. - let cmd = ::command().args_override_self(true); + let cmd = ::command() + .args_override_self(true) + .version(version()); let mut matches = cmd.get_matches(); let mut args = ::from_arg_matches_mut(&mut matches)?; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index c48b5330b3..25a3334e63 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -12,6 +12,8 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::fs::File; use tokio::io::AsyncReadExt; +use tokio_util::sync::CancellationToken; +use utils::failpoint_support::failpoints_handler; use std::io::Write as _; use tokio::sync::mpsc; @@ -444,6 +446,12 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .data(Arc::new(conf)) .data(auth) .get("/v1/status", |r| request_span(r, status_handler)) + .put("/v1/failpoints", |r| { + request_span(r, move |r| async { + let cancel = CancellationToken::new(); + failpoints_handler(r, cancel).await + }) + }) // Will be used in the future instead of implicit timeline creation .post("/v1/tenant/timeline", |r| { request_span(r, timeline_create_handler) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 597e311e02..9aa82d8854 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -890,8 +890,8 @@ class NeonEnv: """Get list of safekeeper endpoints suitable for safekeepers GUC""" return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers) - def get_pageserver_version(self) -> str: - bin_pageserver = str(self.neon_binpath / "pageserver") + def get_binary_version(self, binary_name: str) -> str: + bin_pageserver = str(self.neon_binpath / binary_name) res = subprocess.run( [bin_pageserver, "--version"], check=True, @@ -1656,7 +1656,7 @@ class NeonPageserver(PgProtocol): self.running = False self.service_port = port self.config_override = config_override - self.version = env.get_pageserver_version() + self.version = env.get_binary_version("pageserver") # After a test finishes, we will scrape the log to see if there are any # unexpected error messages. If your test expects an error, add it to @@ -2924,7 +2924,8 @@ class Safekeeper: return res def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient: - return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token) + is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper") + return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled) def data_dir(self) -> str: return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") @@ -2975,10 +2976,11 @@ class SafekeeperMetrics: class SafekeeperHttpClient(requests.Session): HTTPError = requests.HTTPError - def __init__(self, port: int, auth_token: Optional[str] = None): + def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled = False): super().__init__() self.port = port self.auth_token = auth_token + self.is_testing_enabled = is_testing_enabled if auth_token is not None: self.headers["Authorization"] = f"Bearer {auth_token}" @@ -2986,6 +2988,30 @@ class SafekeeperHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + def is_testing_enabled_or_skip(self): + if not self.is_testing_enabled: + pytest.skip("safekeeper was built without 'testing' feature") + + def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]): + self.is_testing_enabled_or_skip() + + if isinstance(config_strings, tuple): + pairs = [config_strings] + else: + pairs = config_strings + + log.info(f"Requesting config failpoints: {repr(pairs)}") + + res = self.put( + f"http://localhost:{self.port}/v1/failpoints", + json=[{"name": name, "actions": actions} for name, actions in pairs], + ) + log.info(f"Got failpoints request response code {res.status_code}") + res.raise_for_status() + res_json = res.json() + assert res_json is None + return res_json + def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]: params = params or {} res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)