mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
feat(storcon): chaos injection of force exit (#10934)
## Problem close https://github.com/neondatabase/cloud/issues/24485 ## Summary of changes This patch adds a new chaos injection mode for the storcon. The chaos injector reads the crontab and exits immediately at the configured time. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -18,6 +18,7 @@ anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
cron.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -115,10 +115,14 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
/// Chaos testing
|
||||
/// Chaos testing: exercise tenant migrations
|
||||
#[arg(long)]
|
||||
chaos_interval: Option<humantime::Duration>,
|
||||
|
||||
/// Chaos testing: exercise an immediate exit
|
||||
#[arg(long)]
|
||||
chaos_exit_crontab: Option<cron::Schedule>,
|
||||
|
||||
// Maximum acceptable lag for the secondary location while draining
|
||||
// a pageserver
|
||||
#[arg(long)]
|
||||
@@ -382,10 +386,12 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
let chaos_exit_crontab = args.chaos_exit_crontab;
|
||||
(
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let mut chaos_injector = ChaosInjector::new(service, interval.into());
|
||||
let mut chaos_injector =
|
||||
ChaosInjector::new(service, interval.into(), chaos_exit_crontab);
|
||||
chaos_injector.run(cancel_bg).await
|
||||
}
|
||||
.instrument(tracing::info_span!("chaos_injector")),
|
||||
|
||||
@@ -16,29 +16,80 @@ use super::{Node, Scheduler, Service, TenantShard};
|
||||
pub struct ChaosInjector {
|
||||
service: Arc<Service>,
|
||||
interval: Duration,
|
||||
chaos_exit_crontab: Option<cron::Schedule>,
|
||||
}
|
||||
|
||||
fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
|
||||
use chrono::Utc;
|
||||
let next = cron.upcoming(Utc).next().unwrap();
|
||||
let duration = (next - Utc::now()).to_std()?;
|
||||
Ok(tokio::time::sleep(duration))
|
||||
}
|
||||
|
||||
async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
|
||||
if let Some(sleep) = sleep {
|
||||
sleep.await;
|
||||
Some(())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl ChaosInjector {
|
||||
pub fn new(service: Arc<Service>, interval: Duration) -> Self {
|
||||
Self { service, interval }
|
||||
pub fn new(
|
||||
service: Arc<Service>,
|
||||
interval: Duration,
|
||||
chaos_exit_crontab: Option<cron::Schedule>,
|
||||
) -> Self {
|
||||
Self {
|
||||
service,
|
||||
interval,
|
||||
chaos_exit_crontab,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, cancel: CancellationToken) {
|
||||
let mut interval = tokio::time::interval(self.interval);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Shutting down");
|
||||
return;
|
||||
let cron_interval = {
|
||||
if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
|
||||
match cron_to_next_duration(chaos_exit_crontab) {
|
||||
Ok(interval_exit) => Some(interval_exit),
|
||||
Err(e) => {
|
||||
tracing::error!("Error processing the cron schedule: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
self.inject_chaos().await;
|
||||
|
||||
tracing::info!("Chaos iteration...");
|
||||
};
|
||||
enum ChaosEvent {
|
||||
ShuffleTenant,
|
||||
ForceKill,
|
||||
}
|
||||
let chaos_type = tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
ChaosEvent::ShuffleTenant
|
||||
}
|
||||
Some(_) = maybe_sleep(cron_interval) => {
|
||||
ChaosEvent::ForceKill
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Shutting down");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match chaos_type {
|
||||
ChaosEvent::ShuffleTenant => {
|
||||
self.inject_chaos().await;
|
||||
}
|
||||
ChaosEvent::ForceKill => {
|
||||
self.force_kill().await;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Chaos iteration...");
|
||||
}
|
||||
|
||||
/// If a shard has a secondary and attached location, then re-assign the secondary to be
|
||||
@@ -95,6 +146,11 @@ impl ChaosInjector {
|
||||
);
|
||||
}
|
||||
|
||||
async fn force_kill(&mut self) {
|
||||
tracing::warn!("Injecting chaos: force kill");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
async fn inject_chaos(&mut self) {
|
||||
// Pick some shards to interfere with
|
||||
let batch_size = 128;
|
||||
|
||||
Reference in New Issue
Block a user