diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index f4026efbbf..da7d7e5469 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -165,6 +165,8 @@ pub struct NeonStorageControllerConf { #[serde(with = "humantime_serde")] pub long_reconcile_threshold: Option, + + pub load_safekeepers: bool, } impl NeonStorageControllerConf { @@ -188,6 +190,7 @@ impl Default for NeonStorageControllerConf { max_secondary_lag_bytes: None, heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL, long_reconcile_threshold: None, + load_safekeepers: true, } } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 16e12f4e02..77a9075aa7 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -537,6 +537,10 @@ impl StorageController { args.push("--start-as-candidate".to_string()); } + if self.config.load_safekeepers { + args.push("--load-safekeepers".to_string()); + } + if let Some(private_key) = &self.private_key { let claims = Claims::new(None, Scope::PageServerApi); let jwt_token = diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index c90f81889b..589ac5ae88 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -473,21 +473,15 @@ async fn wait_for_active_tenant( } let mut update_rx = tenant.subscribe_for_state_updates(); - loop { - tokio::select! { - _ = cancel.cancelled() => return ControlFlow::Break(()), - result = update_rx.changed() => if result.is_err() { + tokio::select! { + result = update_rx.wait_for(|s| s == &TenantState::Active) => { + if result.is_err() { return ControlFlow::Break(()); } - } - - match &*update_rx.borrow() { - TenantState::Active => { - debug!("Tenant state changed to active, continuing the task loop"); - return ControlFlow::Continue(()); - } - state => debug!("Not running the task loop, tenant is not active: {state:?}"), - } + debug!("Tenant state changed to active, continuing the task loop"); + ControlFlow::Continue(()) + }, + _ = cancel.cancelled() => ControlFlow::Break(()), } } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 04dd3bb3f6..380ffeb9b7 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -138,6 +138,10 @@ struct Cli { // Flag to use https for requests to pageserver API. #[arg(long, default_value = "false")] use_https_pageserver_api: bool, + + /// Whether to load safekeeprs from the database and heartbeat them + #[arg(long, default_value = "false")] + load_safekeepers: bool, } enum StrictMode { @@ -350,6 +354,7 @@ async fn async_main() -> anyhow::Result<()> { start_as_candidate: args.start_as_candidate, http_service_port: args.listen.port() as i32, use_https_pageserver_api: args.use_https_pageserver_api, + load_safekeepers: args.load_safekeepers, }; // Validate that we can connect to the database diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9a3e042c24..d53b3d6598 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -389,6 +389,8 @@ pub struct Config { pub long_reconcile_threshold: Duration, pub use_https_pageserver_api: bool, + + pub load_safekeepers: bool, } impl From for ApiError { @@ -1405,15 +1407,20 @@ impl Service { .set(nodes.len() as i64); tracing::info!("Loading safekeepers from database..."); - let safekeepers = persistence - .list_safekeepers() - .await? - .into_iter() - .map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new())) - .collect::>(); + let safekeepers = if config.load_safekeepers { + persistence + .list_safekeepers() + .await? + .into_iter() + .map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new())) + .collect::>() + } else { + tracing::info!("Skipping safekeeper loading"); + Default::default() + }; + let safekeepers: HashMap = safekeepers.into_iter().map(|n| (n.get_id(), n)).collect(); - tracing::info!("Loaded {} safekeepers from database.", safekeepers.len()); tracing::info!("Loading shards from database..."); let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?; @@ -8022,7 +8029,8 @@ impl Service { ) -> Result<(), DatabaseError> { let node_id = NodeId(record.id as u64); self.persistence.safekeeper_upsert(record.clone()).await?; - { + + if self.config.load_safekeepers { let mut locked = self.inner.write().unwrap(); let mut safekeepers = (*locked.safekeepers).clone(); match safekeepers.entry(node_id) { @@ -8054,7 +8062,7 @@ impl Service { .await?; let node_id = NodeId(id as u64); // After the change has been persisted successfully, update the in-memory state - { + if self.config.load_safekeepers { let mut locked = self.inner.write().unwrap(); let mut safekeepers = (*locked.safekeepers).clone(); let sk = safekeepers diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5159ad4e3b..a8aad03459 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1128,6 +1128,13 @@ class NeonEnv: if self.storage_controller_config is not None: cfg["storage_controller"] = self.storage_controller_config + # Disable new storcon flag in compat tests + if config.test_may_use_compatibility_snapshot_binaries: + if "storage_controller" in cfg: + cfg["storage_controller"]["load_safekeepers"] = False + else: + cfg["storage_controller"] = {"load_safekeepers": False} + # Create config for pageserver http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"