mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
storage_controller: start adding chaos hooks (#7946)
Chaos injection bridges the gap between automated testing (where we do lots of different things with small, short-lived tenants), and staging (where we do many fewer things, but with larger, long-lived tenants). This PR adds a first type of chaos which isn't really very chaotic: it's live migration of tenants between healthy pageservers. This nevertheless provides continuous checks that things like clean, prompt shutdown of tenants works for realistically deployed pageservers with realistically large tenants.
This commit is contained in:
@@ -9,12 +9,14 @@ use std::time::Duration;
|
||||
use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::{
|
||||
Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
|
||||
RECONCILER_CONCURRENCY_DEFAULT,
|
||||
};
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::logging::{self, LogFormat};
|
||||
|
||||
@@ -86,6 +88,10 @@ struct Cli {
|
||||
// TODO: make `cfg(feature = "testing")`
|
||||
#[arg(long)]
|
||||
neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
/// Chaos testing
|
||||
#[arg(long)]
|
||||
chaos_interval: Option<humantime::Duration>,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -309,6 +315,22 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
tracing::info!("Serving on {0}", args.listen);
|
||||
let server_task = tokio::task::spawn(server);
|
||||
|
||||
let chaos_task = args.chaos_interval.map(|interval| {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
(
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let mut chaos_injector = ChaosInjector::new(service, interval.into());
|
||||
chaos_injector.run(cancel_bg).await
|
||||
}
|
||||
.instrument(tracing::info_span!("chaos_injector")),
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
});
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
@@ -337,6 +359,12 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// If we were injecting chaos, stop that so that we're not calling into Service while it shuts down
|
||||
if let Some((chaos_jh, chaos_cancel)) = chaos_task {
|
||||
chaos_cancel.cancel();
|
||||
chaos_jh.await.ok();
|
||||
}
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
|
||||
@@ -84,6 +84,8 @@ use crate::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod chaos_injector;
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
|
||||
71
storage_controller/src/service/chaos_injector.rs
Normal file
71
storage_controller/src/service/chaos_injector.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::thread_rng;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::Service;
|
||||
|
||||
pub struct ChaosInjector {
|
||||
service: Arc<Service>,
|
||||
interval: Duration,
|
||||
}
|
||||
|
||||
impl ChaosInjector {
|
||||
pub fn new(service: Arc<Service>, interval: Duration) -> Self {
|
||||
Self { service, interval }
|
||||
}
|
||||
|
||||
pub async fn run(&mut self, cancel: CancellationToken) {
|
||||
let mut interval = tokio::time::interval(self.interval);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {}
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("Shutting down");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.inject_chaos().await;
|
||||
|
||||
tracing::info!("Chaos iteration...");
|
||||
}
|
||||
}
|
||||
|
||||
async fn inject_chaos(&mut self) {
|
||||
// Pick some shards to interfere with
|
||||
let batch_size = 128;
|
||||
let mut inner = self.service.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = inner.parts_mut();
|
||||
let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
|
||||
let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
|
||||
|
||||
for victim in victims {
|
||||
let shard = tenants
|
||||
.get_mut(victim)
|
||||
.expect("Held lock between choosing ID and this get");
|
||||
|
||||
// Pick a secondary to promote
|
||||
let Some(new_location) = shard
|
||||
.intent
|
||||
.get_secondary()
|
||||
.choose(&mut thread_rng())
|
||||
.cloned()
|
||||
else {
|
||||
tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(old_location) = *shard.intent.get_attached() else {
|
||||
tracing::info!("Skipping shard {victim}: currently has no attached location");
|
||||
continue;
|
||||
};
|
||||
|
||||
shard.intent.demote_attached(scheduler, old_location);
|
||||
shard.intent.promote_attached(scheduler, new_location);
|
||||
self.service.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user