mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 03:20:36 +00:00
support context failpoint in client and mgmt API
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use http::StatusCode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tracing::info;
|
||||
use utils::failpoint_support::apply_failpoint;
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context};
|
||||
|
||||
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
|
||||
@@ -11,10 +12,16 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
pub struct FailpointConfig {
|
||||
/// Name of the fail point
|
||||
pub name: String,
|
||||
/// List of actions to take, using the format described in `fail::cfg`
|
||||
/// List of actions to take, using the format described in neon_failpoint
|
||||
///
|
||||
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
|
||||
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
|
||||
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
|
||||
pub actions: String,
|
||||
/// Optional context matching rules for conditional failpoints
|
||||
/// Each key-value pair specifies a context key and a regex pattern to match against
|
||||
/// All context matchers must match for the failpoint to trigger
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub context_matchers: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
use crate::http::JsonResponse;
|
||||
@@ -32,16 +39,18 @@ pub(in crate::http) async fn configure_failpoints(
|
||||
}
|
||||
|
||||
for fp in &*failpoints {
|
||||
info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers);
|
||||
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
|
||||
let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() {
|
||||
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
|
||||
} else {
|
||||
configure_failpoint(&fp.name, &fp.actions)
|
||||
};
|
||||
|
||||
if let Err(e) = cfg_result {
|
||||
return JsonResponse::error(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("failed to configure failpoints: {e}"),
|
||||
format!("failed to configure failpoint '{}': {e}", fp.name),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use neon_failpoint::{configure_failpoint, has_failpoints};
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints};
|
||||
|
||||
use crate::error::ApiError;
|
||||
use crate::json::{json_request, json_response};
|
||||
@@ -15,8 +16,14 @@ pub struct FailpointConfig {
|
||||
pub name: String,
|
||||
/// List of actions to take, using the format described in neon_failpoint
|
||||
///
|
||||
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off"
|
||||
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
|
||||
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
|
||||
pub actions: String,
|
||||
/// Optional context matching rules for conditional failpoints
|
||||
/// Each key-value pair specifies a context key and a regex pattern to match against
|
||||
/// All context matchers must match for the failpoint to trigger
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub context_matchers: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
/// Configure failpoints through http.
|
||||
@@ -32,13 +39,17 @@ pub async fn failpoints_handler(
|
||||
|
||||
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
|
||||
for fp in failpoints {
|
||||
tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
tracing::info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers);
|
||||
|
||||
let cfg_result = configure_failpoint(&fp.name, &fp.actions);
|
||||
let cfg_result = if let Some(context_matchers) = fp.context_matchers {
|
||||
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
|
||||
} else {
|
||||
configure_failpoint(&fp.name, &fp.actions)
|
||||
};
|
||||
|
||||
if let Err(err) = cfg_result {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Failed to configure failpoints: {err}"
|
||||
"Failed to configure failpoint '{}': {}", fp.name, err
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,16 +159,59 @@ class EndpointHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def configure_failpoints(self, *args: tuple[str, str]) -> None:
|
||||
body: list[dict[str, str]] = []
|
||||
def configure_failpoints(
|
||||
self, *args: tuple[str, str] | list[dict[str, str | dict[str, str]]]
|
||||
) -> None:
|
||||
"""Configure failpoints for testing purposes.
|
||||
|
||||
for fp in args:
|
||||
body.append(
|
||||
{
|
||||
"name": fp[0],
|
||||
"action": fp[1],
|
||||
Args:
|
||||
*args: Can be one of:
|
||||
- Variable number of (name, actions) tuples
|
||||
- Single list of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoints
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
client.configure_failpoints(("fp1", "return"), ("fp2", "sleep(1000)"))
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
|
||||
request_body: list[dict[str, Any]] = []
|
||||
|
||||
if (
|
||||
len(args) == 1
|
||||
and isinstance(args[0], list)
|
||||
and args[0]
|
||||
and isinstance(args[0][0], dict)
|
||||
):
|
||||
# Handle list of dicts (context-based failpoints)
|
||||
failpoint_configs = args[0]
|
||||
for config in failpoint_configs:
|
||||
server_config: dict[str, Any] = {
|
||||
"name": config["name"],
|
||||
"actions": config["actions"],
|
||||
}
|
||||
)
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
request_body.append(server_config)
|
||||
else:
|
||||
# Handle tuples (basic failpoints)
|
||||
for fp in args:
|
||||
request_body.append(
|
||||
{
|
||||
"name": fp[0],
|
||||
"actions": fp[1],
|
||||
}
|
||||
)
|
||||
|
||||
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
|
||||
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=request_body)
|
||||
res.raise_for_status()
|
||||
|
||||
@@ -2614,22 +2614,63 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
)
|
||||
return res.json()
|
||||
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
def configure_failpoints(
|
||||
self,
|
||||
config_strings: tuple[str, str]
|
||||
| list[tuple[str, str]]
|
||||
| list[dict[str, str | dict[str, str]]],
|
||||
):
|
||||
"""
|
||||
Configure failpoints for testing purposes.
|
||||
|
||||
Args:
|
||||
config_strings: Can be one of:
|
||||
- Single tuple of (name, actions)
|
||||
- List of tuples [(name, actions), ...]
|
||||
- List of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoint
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
|
||||
# Multiple basic failpoints
|
||||
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
# Handle single tuple case
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
config_strings = [config_strings]
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
# Convert to server format
|
||||
body: list[dict[str, str | dict[str, str]]] = []
|
||||
for config in config_strings:
|
||||
if isinstance(config, tuple):
|
||||
# Simple (name, actions) tuple
|
||||
body.append({"name": config[0], "actions": config[1]})
|
||||
elif isinstance(config, dict):
|
||||
# Dict with name, actions, and optional context_matchers
|
||||
server_config: dict[str, str | dict[str, str]] = {
|
||||
"name": config["name"],
|
||||
"actions": config["actions"],
|
||||
}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
body.append(server_config)
|
||||
else:
|
||||
raise ValueError(f"Invalid config format: {config}")
|
||||
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.api}/debug/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
res = self.request("PUT", f"{self.api_root()}/debug/v1/failpoints", json=body)
|
||||
if res.status_code != 200:
|
||||
self.raise_api_exception(res)
|
||||
|
||||
def get_tenants_placement(self) -> defaultdict[str, dict[str, Any]]:
|
||||
"""
|
||||
|
||||
@@ -309,25 +309,64 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
def configure_failpoints(
|
||||
self,
|
||||
config_strings: tuple[str, str]
|
||||
| list[tuple[str, str]]
|
||||
| list[dict[str, str | dict[str, str]]],
|
||||
):
|
||||
"""
|
||||
Configure failpoints for testing purposes.
|
||||
|
||||
Args:
|
||||
config_strings: Can be one of:
|
||||
- Single tuple of (name, actions)
|
||||
- List of tuples [(name, actions), ...]
|
||||
- List of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoint
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
|
||||
# Multiple basic failpoints
|
||||
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
# Handle single tuple case
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
config_strings = [config_strings]
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
# Convert to server format
|
||||
body: list[dict[str, str | dict[str, str]]] = []
|
||||
for config in config_strings:
|
||||
if isinstance(config, tuple):
|
||||
# Simple (name, actions) tuple
|
||||
body.append({"name": config[0], "actions": config[1]})
|
||||
elif isinstance(config, dict):
|
||||
# Dict with name, actions, and optional context_matchers
|
||||
server_config = {"name": config["name"], "actions": config["actions"]}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
body.append(server_config)
|
||||
else:
|
||||
raise ValueError(f"Invalid config format: {config}")
|
||||
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
res = self.post(f"{self.base_url}/failpoints", json=body)
|
||||
if res.status_code != 200:
|
||||
raise PageserverApiException(
|
||||
f"Failed to configure failpoints: {res.text}", res.status_code
|
||||
)
|
||||
|
||||
def reload_auth_validation_keys(self):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
|
||||
|
||||
@@ -8,7 +8,6 @@ import pytest
|
||||
import requests
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.utils import EnhancedJSONEncoder, wait_until
|
||||
|
||||
@@ -155,25 +154,62 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
if not self.is_testing_enabled:
|
||||
pytest.skip("safekeeper was built without 'testing' feature")
|
||||
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
def configure_failpoints(
|
||||
self,
|
||||
config_strings: tuple[str, str]
|
||||
| list[tuple[str, str]]
|
||||
| list[dict[str, str | dict[str, str]]],
|
||||
):
|
||||
"""
|
||||
Configure failpoints for testing purposes.
|
||||
|
||||
Args:
|
||||
config_strings: Can be one of:
|
||||
- Single tuple of (name, actions)
|
||||
- List of tuples [(name, actions), ...]
|
||||
- List of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoint
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
|
||||
# Multiple basic failpoints
|
||||
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
# Handle single tuple case
|
||||
if isinstance(config_strings, tuple):
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
config_strings = [config_strings]
|
||||
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
# Convert to server format
|
||||
body: list[dict[str, str | dict[str, str]]] = []
|
||||
for config in config_strings:
|
||||
if isinstance(config, tuple):
|
||||
# Simple (name, actions) tuple
|
||||
body.append({"name": config[0], "actions": config[1]})
|
||||
elif isinstance(config, dict):
|
||||
# Dict with name, actions, and optional context_matchers
|
||||
server_config = {"name": config["name"], "actions": config["actions"]}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
body.append(server_config)
|
||||
else:
|
||||
raise ValueError(f"Invalid config format: {config}")
|
||||
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
res = self.post(f"http://localhost:{self.port}/v1/failpoints", json=body)
|
||||
if res.status_code != 200:
|
||||
raise RuntimeError(f"Failed to configure failpoints: {res.text}")
|
||||
|
||||
def tenant_delete_force(self, tenant_id: TenantId) -> dict[Any, Any]:
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
|
||||
Reference in New Issue
Block a user