diff --git a/Cargo.lock b/Cargo.lock index 2677699702..764c0fbd30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5703,6 +5703,7 @@ dependencies = [ "pageserver_client", "postgres_connection", "r2d2", + "rand 0.8.5", "reqwest 0.12.4", "routerify", "scopeguard", diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index d14b235046..ecaac04915 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -32,6 +32,7 @@ once_cell.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true +rand.workspace = true reqwest = { workspace = true, features = ["stream"] } routerify.workspace = true serde.workspace = true diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index adbf5c6496..2799f21fdc 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -9,12 +9,14 @@ use std::time::Duration; use storage_controller::http::make_router; use storage_controller::metrics::preinitialize_metrics; use storage_controller::persistence::Persistence; +use storage_controller::service::chaos_injector::ChaosInjector; use storage_controller::service::{ Config, Service, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, }; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::logging::{self, LogFormat}; @@ -86,6 +88,10 @@ struct Cli { // TODO: make `cfg(feature = "testing")` #[arg(long)] neon_local_repo_dir: Option, + + /// Chaos testing + #[arg(long)] + chaos_interval: Option, } enum StrictMode { @@ -309,6 +315,22 @@ async fn async_main() -> anyhow::Result<()> { tracing::info!("Serving on {0}", args.listen); let server_task = tokio::task::spawn(server); + let chaos_task = args.chaos_interval.map(|interval| { + let service = service.clone(); + let cancel = CancellationToken::new(); + let cancel_bg = cancel.clone(); + ( + tokio::task::spawn( + async move { + let mut chaos_injector = ChaosInjector::new(service, interval.into()); + chaos_injector.run(cancel_bg).await + } + .instrument(tracing::info_span!("chaos_injector")), + ), + cancel, + ) + }); + // Wait until we receive a signal let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?; @@ -337,6 +359,12 @@ async fn async_main() -> anyhow::Result<()> { } } + // If we were injecting chaos, stop that so that we're not calling into Service while it shuts down + if let Some((chaos_jh, chaos_cancel)) = chaos_task { + chaos_cancel.cancel(); + chaos_jh.await.ok(); + } + service.shutdown().await; tracing::info!("Service shutdown complete"); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ea515f67da..6940bf2c64 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -84,6 +84,8 @@ use crate::{ }; use serde::{Deserialize, Serialize}; +pub mod chaos_injector; + // For operations that should be quick, like attaching a new tenant const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); diff --git a/storage_controller/src/service/chaos_injector.rs b/storage_controller/src/service/chaos_injector.rs new file mode 100644 index 0000000000..99961d691c --- /dev/null +++ b/storage_controller/src/service/chaos_injector.rs @@ -0,0 +1,71 @@ +use std::{sync::Arc, time::Duration}; + +use rand::seq::SliceRandom; +use rand::thread_rng; +use tokio_util::sync::CancellationToken; + +use super::Service; + +pub struct ChaosInjector { + service: Arc, + interval: Duration, +} + +impl ChaosInjector { + pub fn new(service: Arc, interval: Duration) -> Self { + Self { service, interval } + } + + pub async fn run(&mut self, cancel: CancellationToken) { + let mut interval = tokio::time::interval(self.interval); + + loop { + tokio::select! { + _ = interval.tick() => {} + _ = cancel.cancelled() => { + tracing::info!("Shutting down"); + return; + } + } + + self.inject_chaos().await; + + tracing::info!("Chaos iteration..."); + } + } + + async fn inject_chaos(&mut self) { + // Pick some shards to interfere with + let batch_size = 128; + let mut inner = self.service.inner.write().unwrap(); + let (nodes, tenants, scheduler) = inner.parts_mut(); + let tenant_ids = tenants.keys().cloned().collect::>(); + let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size); + + for victim in victims { + let shard = tenants + .get_mut(victim) + .expect("Held lock between choosing ID and this get"); + + // Pick a secondary to promote + let Some(new_location) = shard + .intent + .get_secondary() + .choose(&mut thread_rng()) + .cloned() + else { + tracing::info!("Skipping shard {victim}: no secondary location, can't migrate"); + continue; + }; + + let Some(old_location) = *shard.intent.get_attached() else { + tracing::info!("Skipping shard {victim}: currently has no attached location"); + continue; + }; + + shard.intent.demote_attached(scheduler, old_location); + shard.intent.promote_attached(scheduler, new_location); + self.service.maybe_reconcile_shard(shard, nodes); + } + } +}