From 75d4ccb05c681817566873798a70c92e4c566ff5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Jul 2025 14:18:06 +0200 Subject: [PATCH] support context failpoint in client and mgmt API --- compute_tools/src/http/routes/failpoints.rs | 25 +++++--- libs/http-utils/src/failpoints.rs | 21 +++++-- test_runner/fixtures/endpoint/http.py | 61 ++++++++++++++++--- test_runner/fixtures/neon_fixtures.py | 67 +++++++++++++++++---- test_runner/fixtures/pageserver/http.py | 67 ++++++++++++++++----- test_runner/fixtures/safekeeper/http.py | 66 +++++++++++++++----- 6 files changed, 243 insertions(+), 64 deletions(-) diff --git a/compute_tools/src/http/routes/failpoints.rs b/compute_tools/src/http/routes/failpoints.rs index 6b1fc6d529..d0661b65cf 100644 --- a/compute_tools/src/http/routes/failpoints.rs +++ b/compute_tools/src/http/routes/failpoints.rs @@ -1,8 +1,9 @@ use axum::response::{IntoResponse, Response}; use http::StatusCode; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use tracing::info; -use utils::failpoint_support::apply_failpoint; +use neon_failpoint::{configure_failpoint, configure_failpoint_with_context}; pub type ConfigureFailpointsRequest = Vec; @@ -11,10 +12,16 @@ pub type ConfigureFailpointsRequest = Vec; pub struct FailpointConfig { /// Name of the fail point pub name: String, - /// List of actions to take, using the format described in `fail::cfg` + /// List of actions to take, using the format described in neon_failpoint /// - /// We also support `actions = "exit"` to cause the fail point to immediately exit. + /// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)" + /// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action" pub actions: String, + /// Optional context matching rules for conditional failpoints + /// Each key-value pair specifies a context key and a regex pattern to match against + /// All context matchers must match for the failpoint to trigger + #[serde(default, skip_serializing_if = "Option::is_none")] + pub context_matchers: Option>, } use crate::http::JsonResponse; @@ -32,16 +39,18 @@ pub(in crate::http) async fn configure_failpoints( } for fp in &*failpoints { - info!("cfg failpoint: {} {}", fp.name, fp.actions); + info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers); - // We recognize one extra "action" that's not natively recognized - // by the failpoints crate: exit, to immediately kill the process - let cfg_result = apply_failpoint(&fp.name, &fp.actions); + let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() { + configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers) + } else { + configure_failpoint(&fp.name, &fp.actions) + }; if let Err(e) = cfg_result { return JsonResponse::error( StatusCode::BAD_REQUEST, - format!("failed to configure failpoints: {e}"), + format!("failed to configure failpoint '{}': {e}", fp.name), ); } } diff --git a/libs/http-utils/src/failpoints.rs b/libs/http-utils/src/failpoints.rs index c3dac1e033..bae5153062 100644 --- a/libs/http-utils/src/failpoints.rs +++ b/libs/http-utils/src/failpoints.rs @@ -1,7 +1,8 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use tokio_util::sync::CancellationToken; -use neon_failpoint::{configure_failpoint, has_failpoints}; +use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints}; use crate::error::ApiError; use crate::json::{json_request, json_response}; @@ -15,8 +16,14 @@ pub struct FailpointConfig { pub name: String, /// List of actions to take, using the format described in neon_failpoint /// - /// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off" + /// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)" + /// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action" pub actions: String, + /// Optional context matching rules for conditional failpoints + /// Each key-value pair specifies a context key and a regex pattern to match against + /// All context matchers must match for the failpoint to trigger + #[serde(default, skip_serializing_if = "Option::is_none")] + pub context_matchers: Option>, } /// Configure failpoints through http. @@ -32,13 +39,17 @@ pub async fn failpoints_handler( let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?; for fp in failpoints { - tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions); + tracing::info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers); - let cfg_result = configure_failpoint(&fp.name, &fp.actions); + let cfg_result = if let Some(context_matchers) = fp.context_matchers { + configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers) + } else { + configure_failpoint(&fp.name, &fp.actions) + }; if let Err(err) = cfg_result { return Err(ApiError::BadRequest(anyhow::anyhow!( - "Failed to configure failpoints: {err}" + "Failed to configure failpoint '{}': {}", fp.name, err ))); } } diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 1d278095ce..9365868d85 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -159,16 +159,59 @@ class EndpointHttpClient(requests.Session): res.raise_for_status() return res.json() - def configure_failpoints(self, *args: tuple[str, str]) -> None: - body: list[dict[str, str]] = [] + def configure_failpoints( + self, *args: tuple[str, str] | list[dict[str, str | dict[str, str]]] + ) -> None: + """Configure failpoints for testing purposes. - for fp in args: - body.append( - { - "name": fp[0], - "action": fp[1], + Args: + *args: Can be one of: + - Variable number of (name, actions) tuples + - Single list of dicts with keys: name, actions, and optionally context_matchers + + Examples: + # Basic failpoints + client.configure_failpoints(("test_fp", "return(error)")) + client.configure_failpoints(("fp1", "return"), ("fp2", "sleep(1000)")) + + # Probability-based failpoint + client.configure_failpoints(("test_fp", "50%return(error)")) + + # Context-based failpoint + client.configure_failpoints([{ + "name": "test_fp", + "actions": "return(error)", + "context_matchers": {"tenant_id": ".*test.*"} + }]) + """ + + request_body: list[dict[str, Any]] = [] + + if ( + len(args) == 1 + and isinstance(args[0], list) + and args[0] + and isinstance(args[0][0], dict) + ): + # Handle list of dicts (context-based failpoints) + failpoint_configs = args[0] + for config in failpoint_configs: + server_config: dict[str, Any] = { + "name": config["name"], + "actions": config["actions"], } - ) + if "context_matchers" in config: + server_config["context_matchers"] = config["context_matchers"] + request_body.append(server_config) + else: + # Handle tuples (basic failpoints) + for fp in args: + request_body.append( + { + "name": fp[0], + "actions": fp[1], + } + ) - res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body) + res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=request_body) res.raise_for_status() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 42924f9b83..d7023eb410 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2614,22 +2614,63 @@ class NeonStorageController(MetricsGetter, LogUtils): ) return res.json() - def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]): + def configure_failpoints( + self, + config_strings: tuple[str, str] + | list[tuple[str, str]] + | list[dict[str, str | dict[str, str]]], + ): + """ + Configure failpoints for testing purposes. + + Args: + config_strings: Can be one of: + - Single tuple of (name, actions) + - List of tuples [(name, actions), ...] + - List of dicts with keys: name, actions, and optionally context_matchers + + Examples: + # Basic failpoint + client.configure_failpoints(("test_fp", "return(error)")) + + # Multiple basic failpoints + client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")]) + + # Probability-based failpoint + client.configure_failpoints(("test_fp", "50%return(error)")) + + # Context-based failpoint + client.configure_failpoints([{ + "name": "test_fp", + "actions": "return(error)", + "context_matchers": {"tenant_id": ".*test.*"} + }]) + """ + # Handle single tuple case if isinstance(config_strings, tuple): - pairs = [config_strings] - else: - pairs = config_strings + config_strings = [config_strings] - log.info(f"Requesting config failpoints: {repr(pairs)}") + # Convert to server format + body: list[dict[str, str | dict[str, str]]] = [] + for config in config_strings: + if isinstance(config, tuple): + # Simple (name, actions) tuple + body.append({"name": config[0], "actions": config[1]}) + elif isinstance(config, dict): + # Dict with name, actions, and optional context_matchers + server_config: dict[str, str | dict[str, str]] = { + "name": config["name"], + "actions": config["actions"], + } + if "context_matchers" in config: + server_config["context_matchers"] = config["context_matchers"] + body.append(server_config) + else: + raise ValueError(f"Invalid config format: {config}") - res = self.request( - "PUT", - f"{self.api}/debug/v1/failpoints", - json=[{"name": name, "actions": actions} for name, actions in pairs], - headers=self.headers(TokenScope.ADMIN), - ) - log.info(f"Got failpoints request response code {res.status_code}") - res.raise_for_status() + res = self.request("PUT", f"{self.api_root()}/debug/v1/failpoints", json=body) + if res.status_code != 200: + self.raise_api_exception(res) def get_tenants_placement(self) -> defaultdict[str, dict[str, Any]]: """ diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 23b9d1c8c9..46e196bc81 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -309,25 +309,64 @@ class PageserverHttpClient(requests.Session, MetricsGetter): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() - def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]): + def configure_failpoints( + self, + config_strings: tuple[str, str] + | list[tuple[str, str]] + | list[dict[str, str | dict[str, str]]], + ): + """ + Configure failpoints for testing purposes. + + Args: + config_strings: Can be one of: + - Single tuple of (name, actions) + - List of tuples [(name, actions), ...] + - List of dicts with keys: name, actions, and optionally context_matchers + + Examples: + # Basic failpoint + client.configure_failpoints(("test_fp", "return(error)")) + + # Multiple basic failpoints + client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")]) + + # Probability-based failpoint + client.configure_failpoints(("test_fp", "50%return(error)")) + + # Context-based failpoint + client.configure_failpoints([{ + "name": "test_fp", + "actions": "return(error)", + "context_matchers": {"tenant_id": ".*test.*"} + }]) + """ self.is_testing_enabled_or_skip() + # Handle single tuple case if isinstance(config_strings, tuple): - pairs = [config_strings] - else: - pairs = config_strings + config_strings = [config_strings] - log.info(f"Requesting config failpoints: {repr(pairs)}") + # Convert to server format + body: list[dict[str, str | dict[str, str]]] = [] + for config in config_strings: + if isinstance(config, tuple): + # Simple (name, actions) tuple + body.append({"name": config[0], "actions": config[1]}) + elif isinstance(config, dict): + # Dict with name, actions, and optional context_matchers + server_config = {"name": config["name"], "actions": config["actions"]} + if "context_matchers" in config: + server_config["context_matchers"] = config["context_matchers"] + body.append(server_config) + else: + raise ValueError(f"Invalid config format: {config}") - res = self.put( - f"http://localhost:{self.port}/v1/failpoints", - json=[{"name": name, "actions": actions} for name, actions in pairs], - ) - log.info(f"Got failpoints request response code {res.status_code}") - self.verbose_error(res) - res_json = res.json() - assert res_json is None - return res_json + res = self.post(f"{self.base_url}/failpoints", json=body) + if res.status_code != 200: + raise PageserverApiException( + f"Failed to configure failpoints: {res.text}", res.status_code + ) def reload_auth_validation_keys(self): res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys") diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index ceb00c0f90..354332f069 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -8,7 +8,6 @@ import pytest import requests from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId -from fixtures.log_helper import log from fixtures.metrics import Metrics, MetricsGetter, parse_metrics from fixtures.utils import EnhancedJSONEncoder, wait_until @@ -155,25 +154,62 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): if not self.is_testing_enabled: pytest.skip("safekeeper was built without 'testing' feature") - def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]): + def configure_failpoints( + self, + config_strings: tuple[str, str] + | list[tuple[str, str]] + | list[dict[str, str | dict[str, str]]], + ): + """ + Configure failpoints for testing purposes. + + Args: + config_strings: Can be one of: + - Single tuple of (name, actions) + - List of tuples [(name, actions), ...] + - List of dicts with keys: name, actions, and optionally context_matchers + + Examples: + # Basic failpoint + client.configure_failpoints(("test_fp", "return(error)")) + + # Multiple basic failpoints + client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")]) + + # Probability-based failpoint + client.configure_failpoints(("test_fp", "50%return(error)")) + + # Context-based failpoint + client.configure_failpoints([{ + "name": "test_fp", + "actions": "return(error)", + "context_matchers": {"tenant_id": ".*test.*"} + }]) + """ self.is_testing_enabled_or_skip() + # Handle single tuple case if isinstance(config_strings, tuple): - pairs = [config_strings] - else: - pairs = config_strings + config_strings = [config_strings] - log.info(f"Requesting config failpoints: {repr(pairs)}") + # Convert to server format + body: list[dict[str, str | dict[str, str]]] = [] + for config in config_strings: + if isinstance(config, tuple): + # Simple (name, actions) tuple + body.append({"name": config[0], "actions": config[1]}) + elif isinstance(config, dict): + # Dict with name, actions, and optional context_matchers + server_config = {"name": config["name"], "actions": config["actions"]} + if "context_matchers" in config: + server_config["context_matchers"] = config["context_matchers"] + body.append(server_config) + else: + raise ValueError(f"Invalid config format: {config}") - res = self.put( - f"http://localhost:{self.port}/v1/failpoints", - json=[{"name": name, "actions": actions} for name, actions in pairs], - ) - log.info(f"Got failpoints request response code {res.status_code}") - res.raise_for_status() - res_json = res.json() - assert res_json is None - return res_json + res = self.post(f"http://localhost:{self.port}/v1/failpoints", json=body) + if res.status_code != 200: + raise RuntimeError(f"Failed to configure failpoints: {res.text}") def tenant_delete_force(self, tenant_id: TenantId) -> dict[Any, Any]: res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")