mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
pageserver crate: install neon_failpoint as fail + cargo fmt
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use http::StatusCode;
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tracing::info;
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context};
|
||||
|
||||
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
|
||||
@@ -39,7 +39,10 @@ pub(in crate::http) async fn configure_failpoints(
|
||||
}
|
||||
|
||||
for fp in &*failpoints {
|
||||
info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers);
|
||||
info!(
|
||||
"cfg failpoint: {} {} (context: {:?})",
|
||||
fp.name, fp.actions, fp.context_matchers
|
||||
);
|
||||
|
||||
let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() {
|
||||
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
|
||||
|
||||
@@ -46,7 +46,8 @@ impl<'m> MigrationRunner<'m> {
|
||||
});
|
||||
|
||||
false
|
||||
}.await;
|
||||
}
|
||||
.await;
|
||||
|
||||
if fail {
|
||||
return Err(anyhow::anyhow!(format!(
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints};
|
||||
|
||||
use crate::error::ApiError;
|
||||
use crate::json::{json_request, json_response};
|
||||
@@ -39,7 +39,12 @@ pub async fn failpoints_handler(
|
||||
|
||||
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
|
||||
for fp in failpoints {
|
||||
tracing::info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers);
|
||||
tracing::info!(
|
||||
"cfg failpoint: {} {} (context: {:?})",
|
||||
fp.name,
|
||||
fp.actions,
|
||||
fp.context_matchers
|
||||
);
|
||||
|
||||
let cfg_result = if let Some(context_matchers) = fp.context_matchers {
|
||||
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
|
||||
@@ -49,7 +54,9 @@ pub async fn failpoints_handler(
|
||||
|
||||
if let Err(err) = cfg_result {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Failed to configure failpoint '{}': {}", fp.name, err
|
||||
"Failed to configure failpoint '{}': {}",
|
||||
fp.name,
|
||||
err
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use neon_failpoint::{configure_failpoint_with_context, failpoint, failpoint_context, FailpointResult};
|
||||
use neon_failpoint::{
|
||||
configure_failpoint_with_context, failpoint, failpoint_context, FailpointResult,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -12,10 +14,11 @@ async fn main() {
|
||||
context_matchers.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
configure_failpoint_with_context(
|
||||
"backup_operation",
|
||||
"return(simulated_failure)",
|
||||
context_matchers
|
||||
).unwrap();
|
||||
"backup_operation",
|
||||
"return(simulated_failure)",
|
||||
context_matchers,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Test with matching context
|
||||
let context = failpoint_context! {
|
||||
@@ -54,4 +57,4 @@ async fn main() {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,9 +101,9 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
|
||||
notifiers: Vec::new(),
|
||||
trigger_count: 0,
|
||||
};
|
||||
|
||||
|
||||
let mut failpoints = FAILPOINTS.write();
|
||||
|
||||
|
||||
// If this failpoint already exists, notify all waiting tasks
|
||||
if let Some(existing_config) = failpoints.get(name) {
|
||||
// Notify all waiting tasks about the configuration change
|
||||
@@ -111,9 +111,9 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
|
||||
notifier.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
failpoints.insert(name.to_string(), config);
|
||||
|
||||
|
||||
tracing::info!("Configured failpoint: {} = {}", name, actions);
|
||||
Ok(())
|
||||
}
|
||||
@@ -131,9 +131,9 @@ pub fn configure_failpoint_with_context(
|
||||
notifiers: Vec::new(),
|
||||
trigger_count: 0,
|
||||
};
|
||||
|
||||
|
||||
let mut failpoints = FAILPOINTS.write();
|
||||
|
||||
|
||||
// If this failpoint already exists, notify all waiting tasks
|
||||
if let Some(existing_config) = failpoints.get(name) {
|
||||
// Notify all waiting tasks about the configuration change
|
||||
@@ -141,9 +141,9 @@ pub fn configure_failpoint_with_context(
|
||||
notifier.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
failpoints.insert(name.to_string(), config);
|
||||
|
||||
|
||||
tracing::info!("Configured failpoint with context: {} = {}", name, actions);
|
||||
Ok(())
|
||||
}
|
||||
@@ -151,16 +151,16 @@ pub fn configure_failpoint_with_context(
|
||||
/// Remove a failpoint configuration
|
||||
pub fn remove_failpoint(name: &str) {
|
||||
let mut failpoints = FAILPOINTS.write();
|
||||
|
||||
|
||||
// Notify all waiting tasks before removing
|
||||
if let Some(existing_config) = failpoints.get(name) {
|
||||
for notifier in &existing_config.notifiers {
|
||||
notifier.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
failpoints.remove(name);
|
||||
|
||||
|
||||
tracing::info!("Removed failpoint: {}", name);
|
||||
}
|
||||
|
||||
@@ -247,10 +247,10 @@ fn execute_action<'a>(
|
||||
}
|
||||
FailpointAction::Sleep(millis) => {
|
||||
tracing::info!("Failpoint {} sleeping for {}ms", name, millis);
|
||||
|
||||
|
||||
// Create a notifier for this task
|
||||
let notifier = Arc::new(Notify::new());
|
||||
|
||||
|
||||
// Add the notifier to the failpoint configuration
|
||||
{
|
||||
let mut failpoints = FAILPOINTS.write();
|
||||
@@ -258,13 +258,13 @@ fn execute_action<'a>(
|
||||
fp_config.notifiers.push(notifier.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Create cleanup guard to remove notifier when done
|
||||
let _guard = NotifierCleanupGuard {
|
||||
failpoint_name: name.to_string(),
|
||||
notifier: notifier.clone(),
|
||||
};
|
||||
|
||||
|
||||
// Sleep with cancellation support
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_millis(*millis)) => {
|
||||
@@ -283,10 +283,10 @@ fn execute_action<'a>(
|
||||
}
|
||||
FailpointAction::Pause => {
|
||||
tracing::info!("Failpoint {} pausing", name);
|
||||
|
||||
|
||||
// Create a notifier for this task
|
||||
let notifier = Arc::new(Notify::new());
|
||||
|
||||
|
||||
// Add the notifier to the failpoint configuration
|
||||
{
|
||||
let mut failpoints = FAILPOINTS.write();
|
||||
@@ -294,13 +294,13 @@ fn execute_action<'a>(
|
||||
fp_config.notifiers.push(notifier.clone());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Create cleanup guard to remove notifier when done
|
||||
let _guard = NotifierCleanupGuard {
|
||||
failpoint_name: name.to_string(),
|
||||
notifier: notifier.clone(),
|
||||
};
|
||||
|
||||
|
||||
// Wait until cancelled or notified
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
@@ -400,7 +400,7 @@ fn matches_context(matchers: &HashMap<String, String>, context: &FailpointContex
|
||||
let Some(value) = context.get(key) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
|
||||
// Try to compile and match as regex
|
||||
if let Ok(regex) = Regex::new(pattern) {
|
||||
if !regex.is_match(value) {
|
||||
@@ -427,7 +427,9 @@ impl Drop for NotifierCleanupGuard {
|
||||
let mut failpoints = FAILPOINTS.write();
|
||||
if let Some(fp_config) = failpoints.get_mut(&self.failpoint_name) {
|
||||
// Remove this specific notifier from the list
|
||||
fp_config.notifiers.retain(|n| !Arc::ptr_eq(n, &self.notifier));
|
||||
fp_config
|
||||
.notifiers
|
||||
.retain(|n| !Arc::ptr_eq(n, &self.notifier));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -461,7 +463,7 @@ mod tests {
|
||||
let start = std::time::Instant::now();
|
||||
let result = failpoint("test_sleep", None).await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
|
||||
matches!(result, FailpointResult::Continue);
|
||||
assert!(duration >= Duration::from_millis(10));
|
||||
}
|
||||
@@ -471,13 +473,13 @@ mod tests {
|
||||
configure_failpoint("test_pause", "pause").unwrap();
|
||||
let cancel_token = CancellationToken::new();
|
||||
let cancel_token_clone = cancel_token.clone();
|
||||
|
||||
|
||||
// Cancel after 10ms
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
cancel_token_clone.cancel();
|
||||
});
|
||||
|
||||
|
||||
let result = failpoint_with_cancellation("test_pause", None, &cancel_token).await;
|
||||
matches!(result, FailpointResult::Cancelled);
|
||||
}
|
||||
@@ -486,28 +488,29 @@ mod tests {
|
||||
async fn test_failpoint_pause_and_resume() {
|
||||
let failpoint_name = "test_pause_resume";
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Give it time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
||||
// Reconfigure the failpoint to "off" to resume
|
||||
configure_failpoint(failpoint_name, "off").unwrap();
|
||||
|
||||
|
||||
// The task should complete quickly now
|
||||
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
|
||||
|
||||
let result = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
|
||||
// Should have resumed and continued
|
||||
matches!(result, FailpointResult::Continue);
|
||||
|
||||
|
||||
// Should have taken at least 10ms (initial pause) but less than 1 second
|
||||
assert!(duration >= Duration::from_millis(10));
|
||||
assert!(duration < Duration::from_millis(1000));
|
||||
@@ -517,22 +520,23 @@ mod tests {
|
||||
async fn test_failpoint_pause_and_change_to_return() {
|
||||
let failpoint_name = "test_pause_to_return";
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Give it time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
||||
// Reconfigure the failpoint to return a value
|
||||
configure_failpoint(failpoint_name, "return(resumed)").unwrap();
|
||||
|
||||
|
||||
// The task should complete by resuming (Continue), not with the new return value
|
||||
// The new configuration only applies to future hits of the failpoint
|
||||
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
|
||||
|
||||
let result = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
matches!(result, FailpointResult::Continue);
|
||||
}
|
||||
|
||||
@@ -540,28 +544,29 @@ mod tests {
|
||||
async fn test_failpoint_sleep_and_resume() {
|
||||
let failpoint_name = "test_sleep_resume";
|
||||
configure_failpoint(failpoint_name, "sleep(1000)").unwrap(); // 1 second sleep
|
||||
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Give it time to hit the failpoint and start sleeping
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
||||
// Reconfigure the failpoint to "off" to resume
|
||||
configure_failpoint(failpoint_name, "off").unwrap();
|
||||
|
||||
|
||||
// The task should complete quickly now
|
||||
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
|
||||
|
||||
let result = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
|
||||
// Should have resumed and continued
|
||||
matches!(result, FailpointResult::Continue);
|
||||
|
||||
|
||||
// Should have taken much less than the original 1 second sleep
|
||||
assert!(duration < Duration::from_millis(500));
|
||||
}
|
||||
@@ -570,28 +575,29 @@ mod tests {
|
||||
async fn test_failpoint_sleep_and_change_duration() {
|
||||
let failpoint_name = "test_sleep_change";
|
||||
configure_failpoint(failpoint_name, "sleep(1000)").unwrap(); // 1 second sleep
|
||||
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Give it time to hit the failpoint and start sleeping
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
||||
// Change the sleep duration to a shorter time
|
||||
configure_failpoint(failpoint_name, "sleep(50)").unwrap();
|
||||
|
||||
|
||||
// The task should complete by resuming (Continue), not by sleeping the new duration
|
||||
let result = timeout(Duration::from_millis(200), task).await.unwrap().unwrap();
|
||||
|
||||
let result = timeout(Duration::from_millis(200), task)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let duration = start.elapsed();
|
||||
|
||||
|
||||
// Should have resumed and continued
|
||||
matches!(result, FailpointResult::Continue);
|
||||
|
||||
|
||||
// Should have taken much less than the original 1 second sleep
|
||||
assert!(duration < Duration::from_millis(500));
|
||||
}
|
||||
@@ -600,21 +606,22 @@ mod tests {
|
||||
async fn test_failpoint_remove_during_pause() {
|
||||
let failpoint_name = "test_remove_pause";
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Give it time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
||||
// Remove the failpoint entirely
|
||||
remove_failpoint(failpoint_name);
|
||||
|
||||
|
||||
// The task should complete quickly now
|
||||
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
|
||||
|
||||
let result = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// Should have resumed and continued
|
||||
matches!(result, FailpointResult::Continue);
|
||||
}
|
||||
@@ -623,31 +630,34 @@ mod tests {
|
||||
async fn test_multiple_paused_tasks_resume() {
|
||||
let failpoint_name = "test_multiple_pause";
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
|
||||
// Start multiple tasks that hit the same failpoint
|
||||
let task1 = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task2 = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task3 = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task1 = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
let task2 = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
let task3 = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Give them time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
||||
// Reconfigure the failpoint to "off" to resume all tasks
|
||||
configure_failpoint(failpoint_name, "off").unwrap();
|
||||
|
||||
|
||||
// All tasks should complete quickly now
|
||||
let result1 = timeout(Duration::from_millis(100), task1).await.unwrap().unwrap();
|
||||
let result2 = timeout(Duration::from_millis(100), task2).await.unwrap().unwrap();
|
||||
let result3 = timeout(Duration::from_millis(100), task3).await.unwrap().unwrap();
|
||||
|
||||
let result1 = timeout(Duration::from_millis(100), task1)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let result2 = timeout(Duration::from_millis(100), task2)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let result3 = timeout(Duration::from_millis(100), task3)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// All should have resumed and continued
|
||||
matches!(result1, FailpointResult::Continue);
|
||||
matches!(result2, FailpointResult::Continue);
|
||||
@@ -657,15 +667,13 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_no_race_condition_on_rapid_config_changes() {
|
||||
let failpoint_name = "test_race_condition";
|
||||
|
||||
|
||||
// Start with pause
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
|
||||
// Start a task that will hit the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
failpoint(failpoint_name, None).await
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
|
||||
|
||||
// Rapidly change the configuration multiple times
|
||||
for i in 0..10 {
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
@@ -675,13 +683,16 @@ mod tests {
|
||||
configure_failpoint(failpoint_name, "return(rapid_change)").unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Finally set it to return
|
||||
configure_failpoint(failpoint_name, "return(final)").unwrap();
|
||||
|
||||
|
||||
// The task should complete by resuming (Continue), not with any return value
|
||||
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
|
||||
|
||||
let result = timeout(Duration::from_millis(100), task)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// Should resume normally since it was woken up by a configuration change
|
||||
matches!(result, FailpointResult::Continue);
|
||||
}
|
||||
@@ -691,25 +702,25 @@ mod tests {
|
||||
let mut context = HashMap::new();
|
||||
context.insert("user_id".to_string(), "123".to_string());
|
||||
context.insert("operation".to_string(), "read".to_string());
|
||||
|
||||
|
||||
let mut matchers = HashMap::new();
|
||||
matchers.insert("user_id".to_string(), "123".to_string());
|
||||
matchers.insert("operation".to_string(), "read".to_string());
|
||||
|
||||
|
||||
configure_failpoint_with_context("test_context", "return(matched)", matchers).unwrap();
|
||||
|
||||
|
||||
let result = failpoint("test_context", Some(&context)).await;
|
||||
if let FailpointResult::Return(Some(value)) = result {
|
||||
assert_eq!(value, "matched");
|
||||
} else {
|
||||
panic!("Expected return result");
|
||||
}
|
||||
|
||||
|
||||
// Test with non-matching context
|
||||
let mut wrong_context = HashMap::new();
|
||||
wrong_context.insert("user_id".to_string(), "456".to_string());
|
||||
wrong_context.insert("operation".to_string(), "write".to_string());
|
||||
|
||||
|
||||
let result = failpoint("test_context", Some(&wrong_context)).await;
|
||||
matches!(result, FailpointResult::Continue);
|
||||
}
|
||||
@@ -717,16 +728,16 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_failpoint_panic() {
|
||||
configure_failpoint("test_panic", "panic(test message)").unwrap();
|
||||
|
||||
|
||||
let result = tokio::task::spawn_blocking(|| {
|
||||
std::panic::catch_unwind(|| {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(async {
|
||||
failpoint("test_panic", None).await
|
||||
})
|
||||
rt.block_on(async { failpoint("test_panic", None).await })
|
||||
})
|
||||
}).await.unwrap();
|
||||
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(result.is_err());
|
||||
let panic_msg = result.unwrap_err();
|
||||
if let Some(msg) = panic_msg.downcast_ref::<String>() {
|
||||
@@ -741,7 +752,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_failpoint_probability_simple() {
|
||||
configure_failpoint("test_prob_simple", "100%return(always)").unwrap();
|
||||
|
||||
|
||||
// 100% probability should always trigger
|
||||
let result = failpoint("test_prob_simple", None).await;
|
||||
if let FailpointResult::Return(Some(value)) = result {
|
||||
@@ -749,7 +760,7 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return result with 100% probability");
|
||||
}
|
||||
|
||||
|
||||
// 0% probability should never trigger
|
||||
configure_failpoint("test_prob_never", "0%return(never)").unwrap();
|
||||
let result = failpoint("test_prob_never", None).await;
|
||||
@@ -759,7 +770,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_failpoint_probability_with_count() {
|
||||
configure_failpoint("test_prob_count", "100%2*return(limited)").unwrap();
|
||||
|
||||
|
||||
// First two calls should trigger (100% probability, max 2 times)
|
||||
let result1 = failpoint("test_prob_count", None).await;
|
||||
if let FailpointResult::Return(Some(value)) = result1 {
|
||||
@@ -767,14 +778,14 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return result on first call");
|
||||
}
|
||||
|
||||
|
||||
let result2 = failpoint("test_prob_count", None).await;
|
||||
if let FailpointResult::Return(Some(value)) = result2 {
|
||||
assert_eq!(value, "limited");
|
||||
} else {
|
||||
panic!("Expected return result on second call");
|
||||
}
|
||||
|
||||
|
||||
// Third call should not trigger (count limit exceeded)
|
||||
let result3 = failpoint("test_prob_count", None).await;
|
||||
matches!(result3, FailpointResult::Continue);
|
||||
@@ -783,11 +794,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_failpoint_probability_nested_actions() {
|
||||
configure_failpoint("test_prob_sleep", "100%sleep(10)").unwrap();
|
||||
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let result = failpoint("test_prob_sleep", None).await;
|
||||
let duration = start.elapsed();
|
||||
|
||||
|
||||
matches!(result, FailpointResult::Continue);
|
||||
assert!(duration >= Duration::from_millis(10));
|
||||
}
|
||||
@@ -807,7 +818,7 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return action");
|
||||
}
|
||||
|
||||
|
||||
// Test probability with count
|
||||
let action_spec = parse_action_spec("25%3*return(limited)").unwrap();
|
||||
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
|
||||
@@ -815,7 +826,7 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return action");
|
||||
}
|
||||
|
||||
|
||||
// Test probability with other actions
|
||||
let action_spec = parse_action_spec("75%sleep(100)").unwrap();
|
||||
if let FailpointAction::Sleep(100) = action_spec.action {
|
||||
@@ -830,7 +841,7 @@ mod tests {
|
||||
// Test probability > 100
|
||||
let result = parse_action_spec("150%return(invalid)");
|
||||
assert!(result.is_err());
|
||||
|
||||
|
||||
// Test invalid format
|
||||
let result = parse_action_spec("50%invalid_action");
|
||||
assert!(result.is_err());
|
||||
@@ -839,11 +850,11 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_audit_specific_patterns() {
|
||||
// Test patterns found in the audit
|
||||
|
||||
|
||||
// 1. panic(failpoint) - from test_sharding.py
|
||||
let action_spec = parse_action_spec("panic(failpoint)").unwrap();
|
||||
matches!(action_spec.action, FailpointAction::Panic(msg) if msg == "failpoint");
|
||||
|
||||
|
||||
// 2. 50%return(15) - from test_bad_connection.py
|
||||
let action_spec = parse_action_spec("50%return(15)").unwrap();
|
||||
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
|
||||
@@ -851,7 +862,7 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return action");
|
||||
}
|
||||
|
||||
|
||||
// 3. 10%3*return(3000) - from test_bad_connection.py
|
||||
let action_spec = parse_action_spec("10%3*return(3000)").unwrap();
|
||||
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
|
||||
@@ -859,7 +870,7 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return action");
|
||||
}
|
||||
|
||||
|
||||
// 4. 10%return(60000) - from test_bad_connection.py
|
||||
let action_spec = parse_action_spec("10%return(60000)").unwrap();
|
||||
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
|
||||
@@ -867,12 +878,12 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return action");
|
||||
}
|
||||
|
||||
|
||||
// Test that these patterns work in practice
|
||||
configure_failpoint("test_audit_panic", "panic(storage controller crash)").unwrap();
|
||||
configure_failpoint("test_audit_prob", "100%return(42)").unwrap();
|
||||
configure_failpoint("test_audit_count", "100%1*return(limited)").unwrap();
|
||||
|
||||
|
||||
// Test the probability-based return
|
||||
let result = failpoint("test_audit_prob", None).await;
|
||||
if let FailpointResult::Return(Some(value)) = result {
|
||||
@@ -880,7 +891,7 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return result");
|
||||
}
|
||||
|
||||
|
||||
// Test the count-limited probability
|
||||
let result1 = failpoint("test_audit_count", None).await;
|
||||
if let FailpointResult::Return(Some(value)) = result1 {
|
||||
@@ -888,9 +899,9 @@ mod tests {
|
||||
} else {
|
||||
panic!("Expected return result on first call");
|
||||
}
|
||||
|
||||
|
||||
// Second call should not trigger (count limit exceeded)
|
||||
let result2 = failpoint("test_audit_count", None).await;
|
||||
matches!(result2, FailpointResult::Continue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +151,9 @@ macro_rules! failpoint_return {
|
||||
}};
|
||||
($name:literal, $context:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, Some($context)).await {
|
||||
if let $crate::FailpointResult::Return(value) =
|
||||
$crate::failpoint($name, Some($context)).await
|
||||
{
|
||||
return value.parse().unwrap_or_default();
|
||||
}
|
||||
}
|
||||
@@ -170,7 +172,9 @@ macro_rules! failpoint_bail {
|
||||
}};
|
||||
($name:literal, $context:expr, $error_msg:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, Some($context)).await {
|
||||
if let $crate::FailpointResult::Return(_) =
|
||||
$crate::failpoint($name, Some($context)).await
|
||||
{
|
||||
anyhow::bail!($error_msg);
|
||||
}
|
||||
}
|
||||
@@ -184,4 +188,4 @@ pub use failpoint_bail;
|
||||
pub use failpoint_context;
|
||||
pub use failpoint_return;
|
||||
pub use pausable_failpoint;
|
||||
pub use sleep_millis_async;
|
||||
pub use sleep_millis_async;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
//! Failpoint support code shared between pageserver and safekeepers.
|
||||
//!
|
||||
//!
|
||||
//! This module provides a compatibility layer over the new neon_failpoint crate.
|
||||
|
||||
pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
pub use neon_failpoint::{configure_failpoint as apply_failpoint, init, has_failpoints};
|
||||
|
||||
/// Declare a failpoint that can use to `pause` failpoint action.
|
||||
/// This is now a compatibility wrapper around the new neon_failpoint crate.
|
||||
|
||||
@@ -17,6 +17,7 @@ use anyhow::{Context, anyhow};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
|
||||
|
||||
@@ -6,6 +6,8 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
|
||||
use crate::consumption_metrics::NewMetricsRefRoot;
|
||||
|
||||
use neon_failpoint as fail;
|
||||
|
||||
pub(super) fn read_metrics_from_serde_value(
|
||||
json_value: serde_json::Value,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use neon_failpoint as fail;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -28,6 +28,7 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use humantime::format_rfc3339;
|
||||
use hyper::{Body, Request, Response, StatusCode, Uri, header};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, DownloadRemoteLayersTaskSpawnRequest, IngestAuxFilesRequest,
|
||||
|
||||
@@ -19,6 +19,7 @@ use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, Stream};
|
||||
use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use neon_failpoint as fail;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::{PERF_TRACE_TARGET, ensure_walingest};
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::{
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
||||
|
||||
@@ -30,6 +30,7 @@ use enumset::EnumSet;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use itertools::Itertools as _;
|
||||
use neon_failpoint as fail;
|
||||
use once_cell::sync::Lazy;
|
||||
pub use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{self, RelSizeMigration};
|
||||
|
||||
@@ -12,6 +12,7 @@ use anyhow::Context;
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
|
||||
use pageserver_api::shard::{
|
||||
|
||||
@@ -194,6 +194,7 @@ pub(crate) use download::{
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use regex::Regex;
|
||||
|
||||
@@ -11,6 +11,7 @@ use std::time::SystemTime;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
|
||||
|
||||
@@ -8,6 +8,7 @@ use anyhow::{Context, bail};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
|
||||
use tokio::fs::{self, File};
|
||||
|
||||
@@ -39,6 +39,7 @@ use layer_manager::{
|
||||
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
|
||||
Shutdown,
|
||||
};
|
||||
use neon_failpoint as fail;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
|
||||
@@ -26,6 +26,7 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use neon_failpoint as fail;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::DetachBehavior;
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use neon_failpoint as fail;
|
||||
use tracing::{error, info, info_span};
|
||||
use utils::fs_ext;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
@@ -11,6 +11,7 @@ use bytes::BytesMut;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use neon_failpoint as fail;
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
|
||||
@@ -879,7 +879,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
|_| { false }
|
||||
);
|
||||
self.end_pos > self.start_pos
|
||||
}.await;
|
||||
}
|
||||
.await;
|
||||
|
||||
if have_something_to_send {
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
@@ -938,7 +939,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
|_| { true }
|
||||
);
|
||||
false
|
||||
}.await;
|
||||
}
|
||||
.await;
|
||||
if fp {
|
||||
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
|
||||
return Ok(None);
|
||||
|
||||
Reference in New Issue
Block a user