diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 387fc297f0..e8abde4901 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -211,6 +211,8 @@ pub struct NeonStorageControllerConf { pub use_local_compute_notifications: bool, pub timeline_safekeeper_count: Option, + + pub kick_secondary_downloads: Option, } impl NeonStorageControllerConf { @@ -242,6 +244,7 @@ impl Default for NeonStorageControllerConf { use_https_safekeeper_api: false, use_local_compute_notifications: true, timeline_safekeeper_count: None, + kick_secondary_downloads: None, } } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 95f7533057..334949924c 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -557,6 +557,10 @@ impl StorageController { args.push("--use-local-compute-notifications".to_string()); } + if let Some(value) = self.config.kick_secondary_downloads { + args.push(format!("--kick-secondary-downloads={value}")); + } + if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() { args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap())); } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index fc0ba9f28c..ff134a4ebc 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -5,6 +5,9 @@ use std::time::Duration; use anyhow::{Context, anyhow}; use camino::Utf8PathBuf; + +#[cfg(feature = "testing")] +use clap::ArgAction; use clap::Parser; use futures::future::OptionFuture; use http_utils::tls_certs::ReloadingCertificateResolver; @@ -213,6 +216,13 @@ struct Cli { /// This option exists primarily for testing purposes. #[arg(long, default_value = "3", value_parser = clap::value_parser!(i64).range(1..))] timeline_safekeeper_count: i64, + + /// When set, actively checks and initiates heatmap downloads/uploads during reconciliation. + /// This speed up migrations by avoiding the default wait for the heatmap download interval. + /// Primarily useful for testing to reduce test execution time. + #[cfg(feature = "testing")] + #[arg(long, default_value = "true", action=ArgAction::Set)] + kick_secondary_downloads: bool, } enum StrictMode { @@ -445,6 +455,8 @@ async fn async_main() -> anyhow::Result<()> { timelines_onto_safekeepers: args.timelines_onto_safekeepers, use_local_compute_notifications: args.use_local_compute_notifications, timeline_safekeeper_count: args.timeline_safekeeper_count, + #[cfg(feature = "testing")] + kick_secondary_downloads: args.kick_secondary_downloads, }; // Validate that we can connect to the database diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index b03a6dae04..92844c9c7b 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -856,6 +856,7 @@ impl Reconciler { &self.shard, &self.config, &self.placement_policy, + self.intent.secondary.len(), ); match self.observed.locations.get(&node.get_id()) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { @@ -1235,11 +1236,11 @@ pub(crate) fn attached_location_conf( shard: &ShardIdentity, config: &TenantConfig, policy: &PlacementPolicy, + secondary_count: usize, ) -> LocationConfig { let has_secondaries = match policy { - PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => { - false - } + PlacementPolicy::Detached | PlacementPolicy::Secondary => false, + PlacementPolicy::Attached(0) => secondary_count > 0, PlacementPolicy::Attached(_) => true, }; diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6ec3963c48..0eb87ffbe3 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -470,6 +470,9 @@ pub struct Config { /// Number of safekeepers to choose for a timeline when creating it. /// Safekeepers will be choosen from different availability zones. pub timeline_safekeeper_count: i64, + + #[cfg(feature = "testing")] + pub kick_secondary_downloads: bool, } impl From for ApiError { @@ -2064,6 +2067,7 @@ impl Service { &tenant_shard.shard, &tenant_shard.config, &PlacementPolicy::Attached(0), + tenant_shard.intent.get_secondary().len(), )), }, )]); @@ -5605,7 +5609,15 @@ impl Service { for parent_id in parent_ids { let child_ids = parent_id.split(new_shard_count); - let (pageserver, generation, policy, parent_ident, config, preferred_az) = { + let ( + pageserver, + generation, + policy, + parent_ident, + config, + preferred_az, + secondary_count, + ) = { let mut old_state = tenants .remove(&parent_id) .expect("It was present, we just split it"); @@ -5625,6 +5637,7 @@ impl Service { old_state.shard, old_state.config.clone(), old_state.preferred_az().cloned(), + old_state.intent.get_secondary().len(), ) }; @@ -5646,6 +5659,7 @@ impl Service { &child_shard, &config, &policy, + secondary_count, )), }, ); @@ -8373,6 +8387,11 @@ impl Service { /// we have this helper to move things along faster. #[cfg(feature = "testing")] async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) { + if !self.config.kick_secondary_downloads { + // No-op if kick_secondary_downloads functionaliuty is not configured + return; + } + let (attached_node, secondaries) = { let locked = self.inner.read().unwrap(); let Some(shard) = locked.tenants.get(&tenant_shard_id) else { diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index acd18734cf..789327bfaf 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1381,8 +1381,13 @@ impl TenantShard { .generation .expect("Attempted to enter attached state without a generation"); - let wanted_conf = - attached_location_conf(generation, &self.shard, &self.config, &self.policy); + let wanted_conf = attached_location_conf( + generation, + &self.shard, + &self.config, + &self.policy, + self.intent.get_secondary().len(), + ); match self.observed.locations.get(&node_id) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} Some(_) | None => { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 050d61055e..4eb85119ca 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -453,6 +453,7 @@ class NeonEnvBuilder: pageserver_get_vectored_concurrent_io: str | None = None, pageserver_tracing_config: PageserverTracingConfig | None = None, pageserver_import_config: PageserverImportConfig | None = None, + storcon_kick_secondary_downloads: bool | None = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -514,6 +515,8 @@ class NeonEnvBuilder: self.pageserver_tracing_config = pageserver_tracing_config self.pageserver_import_config = pageserver_import_config + self.storcon_kick_secondary_downloads = storcon_kick_secondary_downloads + self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = ( pageserver_default_tenant_config_compaction_algorithm ) @@ -1221,6 +1224,14 @@ class NeonEnv: else: cfg["storage_controller"] = {"use_local_compute_notifications": False} + if config.storcon_kick_secondary_downloads is not None: + # Configure whether storage controller should actively kick off secondary downloads + if "storage_controller" not in cfg: + cfg["storage_controller"] = {} + cfg["storage_controller"]["kick_secondary_downloads"] = ( + config.storcon_kick_secondary_downloads + ) + # Create config for pageserver http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 74ba74645e..be7f0c8a3e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4434,6 +4434,53 @@ def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == [] +def test_attached_0_graceful_migration(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 4 + neon_env_builder.num_azs = 2 + + neon_env_builder.storcon_kick_secondary_downloads = False + + env = neon_env_builder.init_start() + + # It is default, but we want to ensure that there are no secondary locations requested + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 0}}) + env.storage_controller.reconcile_until_idle() + + desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0] + src_ps_id = desc["node_attached"] + src_ps = env.get_pageserver(src_ps_id) + src_az = desc["preferred_az_id"] + + # There must be no secondary locations with Attached(0) placement policy + assert len(desc["node_secondary"]) == 0 + + # Migrate tenant shard to the same AZ node + dst_ps = [ps for ps in env.pageservers if ps.id != src_ps_id and ps.az_id == src_az][0] + + env.storage_controller.tenant_shard_migrate( + TenantShardId(env.initial_tenant, 0, 0), + dst_ps.id, + config=StorageControllerMigrationConfig(prewarm=True), + ) + + def tenant_shard_migrated(): + src_locations = src_ps.http_client().tenant_list_locations()["tenant_shards"] + assert len(src_locations) == 0 + log.info(f"Tenant shard migrated from {src_ps.id}") + dst_locations = dst_ps.http_client().tenant_list_locations()["tenant_shards"] + assert len(dst_locations) == 1 + assert dst_locations[0][1]["mode"] == "AttachedSingle" + log.info(f"Tenant shard migrated to {dst_ps.id}") + + # After all we expect that tenant shard exists only on dst node. + # We wait so long because [`DEFAULT_HEATMAP_PERIOD`] and [`DEFAULT_DOWNLOAD_INTERVAL`] + # are set to 60 seconds by default. + # + # TODO: we should consider making these configurable, so the test can run faster. + wait_until(tenant_shard_migrated, timeout=180, interval=5, status_interval=10) + log.info("Tenant shard migrated successfully") + + @run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_storage_controller_migrate_with_pageserver_restart( neon_env_builder: NeonEnvBuilder, make_httpserver