mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
cargo fmt
This commit is contained in:
@@ -1,6 +1,4 @@
|
||||
use neon_failpoint::{
|
||||
configure_failpoint_with_context, failpoint, FailpointResult,
|
||||
};
|
||||
use neon_failpoint::{configure_failpoint_with_context, failpoint, FailpointResult};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -27,32 +25,28 @@ async fn main() {
|
||||
|
||||
println!("Testing with matching context...");
|
||||
match failpoint("backup_operation", Some(&context)) {
|
||||
either::Either::Left(result) => {
|
||||
match result {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
either::Either::Left(result) => match result {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
}
|
||||
either::Either::Right(future) => {
|
||||
match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered");
|
||||
}
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
either::Either::Right(future) => match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Test with non-matching context
|
||||
@@ -62,31 +56,27 @@ async fn main() {
|
||||
|
||||
println!("Testing with non-matching context...");
|
||||
match failpoint("backup_operation", Some(&context)) {
|
||||
either::Either::Left(result) => {
|
||||
match result {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered (expected)");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
either::Either::Left(result) => match result {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
}
|
||||
either::Either::Right(future) => {
|
||||
match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered (expected)");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered (expected)");
|
||||
}
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
either::Either::Right(future) => match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered (expected)");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,10 +7,10 @@
|
||||
//! - Compatible API with the existing fail crate usage
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
|
||||
use anyhow::Result;
|
||||
use either::Either;
|
||||
@@ -174,13 +174,18 @@ pub fn has_failpoints() -> bool {
|
||||
}
|
||||
|
||||
pub fn list() -> Vec<(impl std::fmt::Display, impl std::fmt::Display)> {
|
||||
FAILPOINTS.read().iter().map(|(name, config)| {
|
||||
(name.clone(), format!("{config:?}"))
|
||||
}).collect::<Vec<_>>()
|
||||
FAILPOINTS
|
||||
.read()
|
||||
.iter()
|
||||
.map(|(name, config)| (name.clone(), format!("{config:?}")))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Execute a failpoint with optional context
|
||||
pub fn failpoint(name: &str, context: Option<&FailpointContext>) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
|
||||
pub fn failpoint(
|
||||
name: &str,
|
||||
context: Option<&FailpointContext>,
|
||||
) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
|
||||
failpoint_with_cancellation(name, context, &CancellationToken::new())
|
||||
}
|
||||
|
||||
@@ -475,7 +480,6 @@ mod tests {
|
||||
matches!(result, FailpointResult::Continue);
|
||||
}
|
||||
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_failpoint_sleep() {
|
||||
configure_failpoint("test_sleep", "sleep(10)").unwrap();
|
||||
@@ -499,7 +503,12 @@ mod tests {
|
||||
cancel_token_clone.cancel();
|
||||
});
|
||||
|
||||
let result = await_failpoint_result(failpoint_with_cancellation("test_pause", None, &cancel_token)).await;
|
||||
let result = await_failpoint_result(failpoint_with_cancellation(
|
||||
"test_pause",
|
||||
None,
|
||||
&cancel_token,
|
||||
))
|
||||
.await;
|
||||
matches!(result, FailpointResult::Cancelled);
|
||||
}
|
||||
|
||||
@@ -511,9 +520,10 @@ mod tests {
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Give it time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -543,9 +553,10 @@ mod tests {
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Give it time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -571,9 +582,10 @@ mod tests {
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Give it time to hit the failpoint and start sleeping
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -604,9 +616,10 @@ mod tests {
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Give it time to hit the failpoint and start sleeping
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -635,9 +648,10 @@ mod tests {
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
// Start a task that hits the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Give it time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -661,17 +675,20 @@ mod tests {
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
// Start multiple tasks that hit the same failpoint
|
||||
let task1 = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task1 =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
let task2 = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task2 =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
let task3 = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task3 =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Give them time to hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -707,9 +724,10 @@ mod tests {
|
||||
configure_failpoint(failpoint_name, "pause").unwrap();
|
||||
|
||||
// Start a task that will hit the failpoint
|
||||
let task = tokio::spawn(async move {
|
||||
await_failpoint_result(failpoint(failpoint_name, None)).await
|
||||
});
|
||||
let task =
|
||||
tokio::spawn(
|
||||
async move { await_failpoint_result(failpoint(failpoint_name, None)).await },
|
||||
);
|
||||
|
||||
// Rapidly change the configuration multiple times
|
||||
for i in 0..10 {
|
||||
@@ -769,9 +787,7 @@ mod tests {
|
||||
let result = tokio::task::spawn_blocking(|| {
|
||||
std::panic::catch_unwind(|| {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(async {
|
||||
await_failpoint_result(failpoint("test_panic", None)).await
|
||||
})
|
||||
rt.block_on(async { await_failpoint_result(failpoint("test_panic", None)).await })
|
||||
})
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -304,19 +304,15 @@ macro_rules! pausable_failpoint {
|
||||
($name:literal, $cancel:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint_with_cancellation($name, None, $cancel) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => Ok(()),
|
||||
$crate::FailpointResult::Return(_) => Ok(()),
|
||||
$crate::FailpointResult::Cancelled => Err(()),
|
||||
}
|
||||
$crate::either::Either::Left(result) => match result {
|
||||
$crate::FailpointResult::Continue => Ok(()),
|
||||
$crate::FailpointResult::Return(_) => Ok(()),
|
||||
$crate::FailpointResult::Cancelled => Err(()),
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => Ok(()),
|
||||
$crate::FailpointResult::Return(_) => Ok(()),
|
||||
$crate::FailpointResult::Cancelled => Err(()),
|
||||
}
|
||||
$crate::either::Either::Right(future) => match future.await {
|
||||
$crate::FailpointResult::Continue => Ok(()),
|
||||
$crate::FailpointResult::Return(_) => Ok(()),
|
||||
$crate::FailpointResult::Cancelled => Err(()),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
@@ -331,20 +327,20 @@ macro_rules! sleep_millis_async {
|
||||
($name:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(_) => {},
|
||||
$crate::either::Either::Left(_) => {}
|
||||
$crate::either::Either::Right(future) => {
|
||||
future.await;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $cancel:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint_with_cancellation($name, None, $cancel) {
|
||||
$crate::either::Either::Left(_) => {},
|
||||
$crate::either::Either::Left(_) => {}
|
||||
$crate::either::Either::Right(future) => {
|
||||
future.await;
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
|
||||
@@ -336,18 +336,21 @@ async fn page_service_conn_main(
|
||||
|
||||
let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
|
||||
let socket_timeout_ms = (|| {
|
||||
fail::fail_point_sync!("simulated-bad-compute-connection", |avg_timeout_ms: Option<String>| {
|
||||
// Exponential distribution for simulating
|
||||
// poor network conditions, expect about avg_timeout_ms to be around 15
|
||||
// in tests
|
||||
if let Some(avg_timeout_ms) = avg_timeout_ms {
|
||||
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
|
||||
let u = rand::random::<f32>();
|
||||
((1.0 - u).ln() / (-avg)) as u64
|
||||
} else {
|
||||
default_timeout_ms
|
||||
fail::fail_point_sync!(
|
||||
"simulated-bad-compute-connection",
|
||||
|avg_timeout_ms: Option<String>| {
|
||||
// Exponential distribution for simulating
|
||||
// poor network conditions, expect about avg_timeout_ms to be around 15
|
||||
// in tests
|
||||
if let Some(avg_timeout_ms) = avg_timeout_ms {
|
||||
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
|
||||
let u = rand::random::<f32>();
|
||||
((1.0 - u).ln() / (-avg)) as u64
|
||||
} else {
|
||||
default_timeout_ms
|
||||
}
|
||||
}
|
||||
});
|
||||
);
|
||||
default_timeout_ms
|
||||
})();
|
||||
|
||||
|
||||
@@ -5184,7 +5184,9 @@ impl Timeline {
|
||||
*self.applied_gc_cutoff_lsn.read(),
|
||||
);
|
||||
|
||||
neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x: Option<String>| bail!(
|
||||
neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x: Option<
|
||||
String,
|
||||
>| bail!(
|
||||
"{}",
|
||||
x.unwrap()
|
||||
));
|
||||
|
||||
@@ -25,6 +25,7 @@ use futures::stream::FuturesUnordered;
|
||||
use http_utils::error::ApiError;
|
||||
use hyper::Uri;
|
||||
use itertools::Itertools;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
@@ -66,7 +67,6 @@ use utils::lsn::Lsn;
|
||||
use utils::shard::ShardIndex;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::{failpoint_support, pausable_failpoint};
|
||||
use neon_failpoint as fail;
|
||||
|
||||
use crate::background_node_operations::{
|
||||
Delete, Drain, Fill, MAX_RECONCILES_PER_OPERATION, Operation, OperationError, OperationHandler,
|
||||
|
||||
Reference in New Issue
Block a user