storcon: Fix migration for Attached(0) tenants (#12256)

## Problem

`Attached(0)` tenant migrations can get stuck if the heatmap file has
not been uploaded.

## Summary of Changes

- Added a test to reproduce the issue.
- Introduced a `kick_secondary_downloads` config flag:
  - Enabled in testing environments.
  - Disabled in production (and in the new test).
- Updated `Attached(0)` locations to consider the number of secondaries
in their intent when deciding whether to download the heatmap.
This commit is contained in:
Aleksandr Sarantsev
2025-06-23 22:55:26 +04:00
committed by GitHub
parent 85164422d0
commit 5eecde461d
8 changed files with 108 additions and 6 deletions

View File

@@ -211,6 +211,8 @@ pub struct NeonStorageControllerConf {
pub use_local_compute_notifications: bool, pub use_local_compute_notifications: bool,
pub timeline_safekeeper_count: Option<i64>, pub timeline_safekeeper_count: Option<i64>,
pub kick_secondary_downloads: Option<bool>,
} }
impl NeonStorageControllerConf { impl NeonStorageControllerConf {
@@ -242,6 +244,7 @@ impl Default for NeonStorageControllerConf {
use_https_safekeeper_api: false, use_https_safekeeper_api: false,
use_local_compute_notifications: true, use_local_compute_notifications: true,
timeline_safekeeper_count: None, timeline_safekeeper_count: None,
kick_secondary_downloads: None,
} }
} }
} }

View File

@@ -557,6 +557,10 @@ impl StorageController {
args.push("--use-local-compute-notifications".to_string()); args.push("--use-local-compute-notifications".to_string());
} }
if let Some(value) = self.config.kick_secondary_downloads {
args.push(format!("--kick-secondary-downloads={value}"));
}
if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() { if let Some(ssl_ca_file) = self.env.ssl_ca_cert_path() {
args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap())); args.push(format!("--ssl-ca-file={}", ssl_ca_file.to_str().unwrap()));
} }

View File

@@ -5,6 +5,9 @@ use std::time::Duration;
use anyhow::{Context, anyhow}; use anyhow::{Context, anyhow};
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
#[cfg(feature = "testing")]
use clap::ArgAction;
use clap::Parser; use clap::Parser;
use futures::future::OptionFuture; use futures::future::OptionFuture;
use http_utils::tls_certs::ReloadingCertificateResolver; use http_utils::tls_certs::ReloadingCertificateResolver;
@@ -213,6 +216,13 @@ struct Cli {
/// This option exists primarily for testing purposes. /// This option exists primarily for testing purposes.
#[arg(long, default_value = "3", value_parser = clap::value_parser!(i64).range(1..))] #[arg(long, default_value = "3", value_parser = clap::value_parser!(i64).range(1..))]
timeline_safekeeper_count: i64, timeline_safekeeper_count: i64,
/// When set, actively checks and initiates heatmap downloads/uploads during reconciliation.
/// This speed up migrations by avoiding the default wait for the heatmap download interval.
/// Primarily useful for testing to reduce test execution time.
#[cfg(feature = "testing")]
#[arg(long, default_value = "true", action=ArgAction::Set)]
kick_secondary_downloads: bool,
} }
enum StrictMode { enum StrictMode {
@@ -445,6 +455,8 @@ async fn async_main() -> anyhow::Result<()> {
timelines_onto_safekeepers: args.timelines_onto_safekeepers, timelines_onto_safekeepers: args.timelines_onto_safekeepers,
use_local_compute_notifications: args.use_local_compute_notifications, use_local_compute_notifications: args.use_local_compute_notifications,
timeline_safekeeper_count: args.timeline_safekeeper_count, timeline_safekeeper_count: args.timeline_safekeeper_count,
#[cfg(feature = "testing")]
kick_secondary_downloads: args.kick_secondary_downloads,
}; };
// Validate that we can connect to the database // Validate that we can connect to the database

View File

@@ -856,6 +856,7 @@ impl Reconciler {
&self.shard, &self.shard,
&self.config, &self.config,
&self.placement_policy, &self.placement_policy,
self.intent.secondary.len(),
); );
match self.observed.locations.get(&node.get_id()) { match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
@@ -1235,11 +1236,11 @@ pub(crate) fn attached_location_conf(
shard: &ShardIdentity, shard: &ShardIdentity,
config: &TenantConfig, config: &TenantConfig,
policy: &PlacementPolicy, policy: &PlacementPolicy,
secondary_count: usize,
) -> LocationConfig { ) -> LocationConfig {
let has_secondaries = match policy { let has_secondaries = match policy {
PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => { PlacementPolicy::Detached | PlacementPolicy::Secondary => false,
false PlacementPolicy::Attached(0) => secondary_count > 0,
}
PlacementPolicy::Attached(_) => true, PlacementPolicy::Attached(_) => true,
}; };

View File

@@ -470,6 +470,9 @@ pub struct Config {
/// Number of safekeepers to choose for a timeline when creating it. /// Number of safekeepers to choose for a timeline when creating it.
/// Safekeepers will be choosen from different availability zones. /// Safekeepers will be choosen from different availability zones.
pub timeline_safekeeper_count: i64, pub timeline_safekeeper_count: i64,
#[cfg(feature = "testing")]
pub kick_secondary_downloads: bool,
} }
impl From<DatabaseError> for ApiError { impl From<DatabaseError> for ApiError {
@@ -2064,6 +2067,7 @@ impl Service {
&tenant_shard.shard, &tenant_shard.shard,
&tenant_shard.config, &tenant_shard.config,
&PlacementPolicy::Attached(0), &PlacementPolicy::Attached(0),
tenant_shard.intent.get_secondary().len(),
)), )),
}, },
)]); )]);
@@ -5605,7 +5609,15 @@ impl Service {
for parent_id in parent_ids { for parent_id in parent_ids {
let child_ids = parent_id.split(new_shard_count); let child_ids = parent_id.split(new_shard_count);
let (pageserver, generation, policy, parent_ident, config, preferred_az) = { let (
pageserver,
generation,
policy,
parent_ident,
config,
preferred_az,
secondary_count,
) = {
let mut old_state = tenants let mut old_state = tenants
.remove(&parent_id) .remove(&parent_id)
.expect("It was present, we just split it"); .expect("It was present, we just split it");
@@ -5625,6 +5637,7 @@ impl Service {
old_state.shard, old_state.shard,
old_state.config.clone(), old_state.config.clone(),
old_state.preferred_az().cloned(), old_state.preferred_az().cloned(),
old_state.intent.get_secondary().len(),
) )
}; };
@@ -5646,6 +5659,7 @@ impl Service {
&child_shard, &child_shard,
&config, &config,
&policy, &policy,
secondary_count,
)), )),
}, },
); );
@@ -8373,6 +8387,11 @@ impl Service {
/// we have this helper to move things along faster. /// we have this helper to move things along faster.
#[cfg(feature = "testing")] #[cfg(feature = "testing")]
async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) { async fn kick_secondary_download(&self, tenant_shard_id: TenantShardId) {
if !self.config.kick_secondary_downloads {
// No-op if kick_secondary_downloads functionaliuty is not configured
return;
}
let (attached_node, secondaries) = { let (attached_node, secondaries) = {
let locked = self.inner.read().unwrap(); let locked = self.inner.read().unwrap();
let Some(shard) = locked.tenants.get(&tenant_shard_id) else { let Some(shard) = locked.tenants.get(&tenant_shard_id) else {

View File

@@ -1381,8 +1381,13 @@ impl TenantShard {
.generation .generation
.expect("Attempted to enter attached state without a generation"); .expect("Attempted to enter attached state without a generation");
let wanted_conf = let wanted_conf = attached_location_conf(
attached_location_conf(generation, &self.shard, &self.config, &self.policy); generation,
&self.shard,
&self.config,
&self.policy,
self.intent.get_secondary().len(),
);
match self.observed.locations.get(&node_id) { match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => { Some(_) | None => {

View File

@@ -453,6 +453,7 @@ class NeonEnvBuilder:
pageserver_get_vectored_concurrent_io: str | None = None, pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None, pageserver_tracing_config: PageserverTracingConfig | None = None,
pageserver_import_config: PageserverImportConfig | None = None, pageserver_import_config: PageserverImportConfig | None = None,
storcon_kick_secondary_downloads: bool | None = None,
): ):
self.repo_dir = repo_dir self.repo_dir = repo_dir
self.rust_log_override = rust_log_override self.rust_log_override = rust_log_override
@@ -514,6 +515,8 @@ class NeonEnvBuilder:
self.pageserver_tracing_config = pageserver_tracing_config self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_import_config = pageserver_import_config self.pageserver_import_config = pageserver_import_config
self.storcon_kick_secondary_downloads = storcon_kick_secondary_downloads
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = ( self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm pageserver_default_tenant_config_compaction_algorithm
) )
@@ -1221,6 +1224,14 @@ class NeonEnv:
else: else:
cfg["storage_controller"] = {"use_local_compute_notifications": False} cfg["storage_controller"] = {"use_local_compute_notifications": False}
if config.storcon_kick_secondary_downloads is not None:
# Configure whether storage controller should actively kick off secondary downloads
if "storage_controller" not in cfg:
cfg["storage_controller"] = {}
cfg["storage_controller"]["kick_secondary_downloads"] = (
config.storcon_kick_secondary_downloads
)
# Create config for pageserver # Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"

View File

@@ -4434,6 +4434,53 @@ def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder,
assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == [] assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == []
def test_attached_0_graceful_migration(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 4
neon_env_builder.num_azs = 2
neon_env_builder.storcon_kick_secondary_downloads = False
env = neon_env_builder.init_start()
# It is default, but we want to ensure that there are no secondary locations requested
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 0}})
env.storage_controller.reconcile_until_idle()
desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0]
src_ps_id = desc["node_attached"]
src_ps = env.get_pageserver(src_ps_id)
src_az = desc["preferred_az_id"]
# There must be no secondary locations with Attached(0) placement policy
assert len(desc["node_secondary"]) == 0
# Migrate tenant shard to the same AZ node
dst_ps = [ps for ps in env.pageservers if ps.id != src_ps_id and ps.az_id == src_az][0]
env.storage_controller.tenant_shard_migrate(
TenantShardId(env.initial_tenant, 0, 0),
dst_ps.id,
config=StorageControllerMigrationConfig(prewarm=True),
)
def tenant_shard_migrated():
src_locations = src_ps.http_client().tenant_list_locations()["tenant_shards"]
assert len(src_locations) == 0
log.info(f"Tenant shard migrated from {src_ps.id}")
dst_locations = dst_ps.http_client().tenant_list_locations()["tenant_shards"]
assert len(dst_locations) == 1
assert dst_locations[0][1]["mode"] == "AttachedSingle"
log.info(f"Tenant shard migrated to {dst_ps.id}")
# After all we expect that tenant shard exists only on dst node.
# We wait so long because [`DEFAULT_HEATMAP_PERIOD`] and [`DEFAULT_DOWNLOAD_INTERVAL`]
# are set to 60 seconds by default.
#
# TODO: we should consider making these configurable, so the test can run faster.
wait_until(tenant_shard_migrated, timeout=180, interval=5, status_interval=10)
log.info("Tenant shard migrated successfully")
@run_only_on_default_postgres("this is like a 'unit test' against storcon db") @run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_storage_controller_migrate_with_pageserver_restart( def test_storage_controller_migrate_with_pageserver_restart(
neon_env_builder: NeonEnvBuilder, make_httpserver neon_env_builder: NeonEnvBuilder, make_httpserver