diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index 32d2cb2643..a62357f9ac 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -118,6 +118,15 @@ impl Reconciler { flush_ms: Option, lazy: bool, ) -> Result<(), ReconcileError> { + if !node.is_available() && config.mode == LocationConfigMode::Detached { + // Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline + // will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of + // what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`] + tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation"); + self.observed.locations.remove(&node.get_id()); + return Ok(()); + } + self.observed .locations .insert(node.get_id(), ObservedStateLocation { conf: None }); @@ -150,9 +159,16 @@ impl Reconciler { }; tracing::info!("location_config({node}) complete: {:?}", config); - self.observed - .locations - .insert(node.get_id(), ObservedStateLocation { conf: Some(config) }); + match config.mode { + LocationConfigMode::Detached => { + self.observed.locations.remove(&node.get_id()); + } + _ => { + self.observed + .locations + .insert(node.get_id(), ObservedStateLocation { conf: Some(config) }); + } + } Ok(()) } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 98377cace6..c886afaf1c 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -210,6 +210,7 @@ struct ShardSplitParams { new_stripe_size: Option, targets: Vec, policy: PlacementPolicy, + config: TenantConfig, shard_ident: ShardIdentity, } @@ -2741,7 +2742,7 @@ impl Service { let detach_locations: Vec<(Node, TenantShardId)> = { let mut detach_locations = Vec::new(); let mut locked = self.inner.write().unwrap(); - let (nodes, tenants, _scheduler) = locked.parts_mut(); + let (nodes, tenants, scheduler) = locked.parts_mut(); for (tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(op.tenant_id)) @@ -2774,6 +2775,13 @@ impl Service { tracing::info!("Restoring parent shard {tenant_shard_id}"); shard.splitting = SplitState::Idle; + if let Err(e) = shard.schedule(scheduler) { + // If this shard can't be scheduled now (perhaps due to offline nodes or + // capacity issues), that must not prevent us rolling back a split. In this + // case it should be eventually scheduled in the background. + tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}") + } + self.maybe_reconcile_shard(shard, nodes); } @@ -2865,7 +2873,7 @@ impl Service { .map(|(shard_id, _)| *shard_id) .collect::>(); - let (_nodes, tenants, scheduler) = locked.parts_mut(); + let (nodes, tenants, scheduler) = locked.parts_mut(); for parent_id in parent_ids { let child_ids = parent_id.split(new_shard_count); @@ -2932,6 +2940,8 @@ impl Service { // find a secondary (e.g. because cluster is overloaded). tracing::warn!("Failed to schedule child shard {child}: {e}"); } + // In the background, attach secondary locations for the new shards + self.maybe_reconcile_shard(&mut child_state, nodes); tenants.insert(child, child_state); response.new_shards.push(child); @@ -2996,6 +3006,7 @@ impl Service { ))); let mut policy = None; + let mut config = None; let mut shard_ident = None; // Validate input, and calculate which shards we will create let (old_shard_count, targets) = @@ -3052,6 +3063,9 @@ impl Service { if shard_ident.is_none() { shard_ident = Some(shard.shard); } + if config.is_none() { + config = Some(shard.config.clone()); + } if tenant_shard_id.shard_count.count() == split_req.new_shard_count { tracing::info!( @@ -3070,8 +3084,6 @@ impl Service { .get(&node_id) .expect("Pageservers may not be deleted while referenced"); - // TODO: if any reconciliation is currently in progress for this shard, wait for it. - targets.push(ShardSplitTarget { parent_id: *tenant_shard_id, node: node.clone(), @@ -3114,6 +3126,7 @@ impl Service { shard_ident.unwrap() }; let policy = policy.unwrap(); + let config = config.unwrap(); Ok(ShardSplitAction::Split(ShardSplitParams { old_shard_count, @@ -3121,6 +3134,7 @@ impl Service { new_stripe_size: split_req.new_stripe_size, targets, policy, + config, shard_ident, })) } @@ -3140,11 +3154,49 @@ impl Service { old_shard_count, new_shard_count, new_stripe_size, - targets, + mut targets, policy, + config, shard_ident, } = params; + // Drop any secondary locations: pageservers do not support splitting these, and in any case the + // end-state for a split tenant will usually be to have secondary locations on different nodes. + // The reconciliation calls in this block also implicitly cancel+barrier wrt any ongoing reconciliation + // at the time of split. + let waiters = { + let mut locked = self.inner.write().unwrap(); + let mut waiters = Vec::new(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + for target in &mut targets { + let Some(shard) = tenants.get_mut(&target.parent_id) else { + // Paranoia check: this shouldn't happen: we have the oplock for this tenant ID. + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard {} not found", + target.parent_id + ))); + }; + + if shard.intent.get_attached() != &Some(target.node.get_id()) { + // Paranoia check: this shouldn't happen: we have the oplock for this tenant ID. + return Err(ApiError::Conflict(format!( + "Shard {} unexpectedly rescheduled during split", + target.parent_id + ))); + } + + // Irrespective of PlacementPolicy, clear secondary locations from intent + shard.intent.clear_secondary(scheduler); + + // Run Reconciler to execute detach fo secondary locations. + if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) { + waiters.push(waiter); + } + } + waiters + }; + self.await_waiters(waiters, RECONCILE_TIMEOUT).await?; + // Before creating any new child shards in memory or on the pageservers, persist them: this // enables us to ensure that we will always be able to clean up if something goes wrong. This also // acts as the protection against two concurrent attempts to split: one of them will get a database @@ -3173,8 +3225,7 @@ impl Service { generation: None, generation_pageserver: Some(target.node.get_id().0 as i64), placement_policy: serde_json::to_string(&policy).unwrap(), - // TODO: get the config out of the map - config: serde_json::to_string(&TenantConfig::default()).unwrap(), + config: serde_json::to_string(&config).unwrap(), splitting: SplitState::Splitting, }); } @@ -3363,6 +3414,11 @@ impl Service { // If we were already attached to something, demote that to a secondary if let Some(old_attached) = old_attached { if n > 0 { + // Remove other secondaries to make room for the location we'll demote + while shard.intent.get_secondary().len() >= n { + shard.intent.pop_secondary(scheduler); + } + shard.intent.push_secondary(scheduler, old_attached); } } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 82af7ed83b..40f19e3b05 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -11,6 +11,7 @@ use crate::{ disk_usage_eviction_task::{ finite_f32, DiskUsageEvictionInfo, EvictionCandidate, EvictionLayer, EvictionSecondaryLayer, }, + is_temporary, metrics::SECONDARY_MODE, tenant::{ config::SecondaryLocationConfig, @@ -961,7 +962,10 @@ async fn init_timeline_state( // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant. warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config"); continue; - } else if crate::is_temporary(&file_path) || is_temp_download_file(&file_path) { + } else if crate::is_temporary(&file_path) + || is_temp_download_file(&file_path) + || is_temporary(&file_path) + { // Temporary files are frequently left behind from restarting during downloads tracing::info!("Cleaning up temporary file {file_path}"); if let Err(e) = tokio::fs::remove_file(&file_path) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1d30c45278..f8994a8dcc 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2150,6 +2150,18 @@ class NeonStorageController(MetricsGetter): shards: list[dict[str, Any]] = body["shards"] return shards + def tenant_describe(self, tenant_id: TenantId): + """ + :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int} + """ + response = self.request( + "GET", + f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}", + headers=self.headers(TokenScope.ADMIN), + ) + response.raise_for_status() + return response.json() + def tenant_shard_split( self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None ) -> list[TenantShardId]: diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 57b2b2b0a1..e6318aff68 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1,5 +1,6 @@ import os import time +from collections import defaultdict from typing import Dict, List, Optional, Union import pytest @@ -13,7 +14,7 @@ from fixtures.neon_fixtures import ( tenant_get_shards, ) from fixtures.remote_storage import s3_storage -from fixtures.types import Lsn, TenantShardId, TimelineId +from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import wait_until from fixtures.workload import Workload from pytest_httpserver import HTTPServer @@ -159,11 +160,20 @@ def test_sharding_split_smoke( neon_env_builder.preserve_database_files = True - env = neon_env_builder.init_start( - initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size + non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024} + + env = neon_env_builder.init_configs(True) + neon_env_builder.start() + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + env.neon_cli.create_tenant( + tenant_id, + timeline_id, + shard_count=shard_count, + shard_stripe_size=stripe_size, + placement_policy='{"Attached": 1}', + conf=non_default_tenant_config, ) - tenant_id = env.initial_tenant - timeline_id = env.initial_timeline workload = Workload(env, tenant_id, timeline_id, branch_name="main") workload.init() @@ -223,6 +233,14 @@ def test_sharding_split_smoke( # Before split, old shards exist assert shards_on_disk(old_shard_ids) + # Before split, we have done one reconcile for each shard + assert ( + env.storage_controller.get_metric_value( + "storage_controller_reconcile_complete_total", filter={"status": "ok"} + ) + == shard_count + ) + env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count) post_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)] @@ -268,13 +286,20 @@ def test_sharding_split_smoke( workload.validate() - # Check that we didn't do any spurious reconciliations. - # Total number of reconciles should have been one per original shard, plus - # one for each shard that was migrated. + # Assert on how many reconciles happened during the process. This is something of an + # implementation detail, but it is useful to detect any bugs that might generate spurious + # extra reconcile iterations. + # + # We'll have: + # - shard_count reconciles for the original setup of the tenant + # - shard_count reconciles for detaching the original secondary locations during split + # - split_shard_count reconciles during shard splitting, for setting up secondaries. + # - shard_count reconciles for the migrations we did to move child shards away from their split location + expect_reconciles = shard_count * 2 + split_shard_count + shard_count reconcile_ok = env.storage_controller.get_metric_value( "storage_controller_reconcile_complete_total", filter={"status": "ok"} ) - assert reconcile_ok == shard_count + split_shard_count // 2 + assert reconcile_ok == expect_reconciles # Check that no cancelled or errored reconciliations occurred: this test does no # failure injection and should run clean. @@ -289,14 +314,34 @@ def test_sharding_split_smoke( env.storage_controller.consistency_check() - # Validate pageserver state - shards_exist: list[TenantShardId] = [] - for pageserver in env.pageservers: - locations = pageserver.http_client().tenant_list_locations() - shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"]) + def get_node_shard_counts(env: NeonEnv, tenant_ids): + total: defaultdict[int, int] = defaultdict(int) + attached: defaultdict[int, int] = defaultdict(int) + for tid in tenant_ids: + for shard in env.storage_controller.tenant_describe(tid)["shards"]: + log.info( + f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} " + ) + for node in shard["node_secondary"]: + total[int(node)] += 1 + attached[int(shard["node_attached"])] += 1 + total[int(shard["node_attached"])] += 1 - log.info(f"Shards after split: {shards_exist}") - assert len(shards_exist) == split_shard_count + return total, attached + + def check_effective_tenant_config(): + # Expect our custom tenant configs to have survived the split + for shard in env.storage_controller.tenant_describe(tenant_id)["shards"]: + node = env.get_pageserver(int(shard["node_attached"])) + config = node.http_client().tenant_config(TenantShardId.parse(shard["tenant_shard_id"])) + for k, v in non_default_tenant_config.items(): + assert config.effective_config[k] == v + + # Validate pageserver state: expect every child shard to have an attached and secondary location + (total, attached) = get_node_shard_counts(env, tenant_ids=[tenant_id]) + assert sum(attached.values()) == split_shard_count + assert sum(total.values()) == split_shard_count * 2 + check_effective_tenant_config() # Ensure post-split pageserver locations survive a restart (i.e. the child shards # correctly wrote config to disk, and the storage controller responds correctly @@ -305,13 +350,11 @@ def test_sharding_split_smoke( pageserver.stop() pageserver.start() - shards_exist = [] - for pageserver in env.pageservers: - locations = pageserver.http_client().tenant_list_locations() - shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"]) - - log.info("Shards after restart: {shards_exist}") - assert len(shards_exist) == split_shard_count + # Validate pageserver state: expect every child shard to have an attached and secondary location + (total, attached) = get_node_shard_counts(env, tenant_ids=[tenant_id]) + assert sum(attached.values()) == split_shard_count + assert sum(total.values()) == split_shard_count * 2 + check_effective_tenant_config() workload.validate() @@ -717,9 +760,16 @@ def test_sharding_split_failures( initial_shard_count = 2 split_shard_count = 4 - env = neon_env_builder.init_start(initial_tenant_shard_count=initial_shard_count) - tenant_id = env.initial_tenant - timeline_id = env.initial_timeline + env = neon_env_builder.init_configs() + env.start() + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + + # Create a tenant with secondary locations enabled + env.neon_cli.create_tenant( + tenant_id, timeline_id, shard_count=initial_shard_count, placement_policy='{"Attached":1}' + ) env.storage_controller.allowed_errors.extend( [ @@ -732,6 +782,8 @@ def test_sharding_split_failures( ".*failpoint.*", # Node offline cases will fail to send requests ".*Reconcile error: receive body: error sending request for url.*", + # Node offline cases will fail inside reconciler when detaching secondaries + ".*Reconcile error on shard.*: receive body: error sending request for url.*", ] ) @@ -769,7 +821,8 @@ def test_sharding_split_failures( # will have succeeded: the net result should be to return to a clean state, including # detaching any child shards. def assert_rolled_back(exclude_ps_id=None) -> None: - count = 0 + secondary_count = 0 + attached_count = 0 for ps in env.pageservers: if exclude_ps_id is not None and ps.id == exclude_ps_id: continue @@ -777,13 +830,25 @@ def test_sharding_split_failures( locations = ps.http_client().tenant_list_locations()["tenant_shards"] for loc in locations: tenant_shard_id = TenantShardId.parse(loc[0]) - log.info(f"Shard {tenant_shard_id} seen on node {ps.id}") + log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}") assert tenant_shard_id.shard_count == initial_shard_count - count += 1 - assert count == initial_shard_count + if loc[1]["mode"] == "Secondary": + secondary_count += 1 + else: + attached_count += 1 + + if exclude_ps_id is not None: + # For a node failure case, we expect there to be a secondary location + # scheduled on the offline node, so expect one fewer secondary in total + assert secondary_count == initial_shard_count - 1 + else: + assert secondary_count == initial_shard_count + + assert attached_count == initial_shard_count def assert_split_done(exclude_ps_id=None) -> None: - count = 0 + secondary_count = 0 + attached_count = 0 for ps in env.pageservers: if exclude_ps_id is not None and ps.id == exclude_ps_id: continue @@ -791,10 +856,14 @@ def test_sharding_split_failures( locations = ps.http_client().tenant_list_locations()["tenant_shards"] for loc in locations: tenant_shard_id = TenantShardId.parse(loc[0]) - log.info(f"Shard {tenant_shard_id} seen on node {ps.id}") + log.info(f"Shard {tenant_shard_id} seen on node {ps.id} in mode {loc[1]['mode']}") assert tenant_shard_id.shard_count == split_shard_count - count += 1 - assert count == split_shard_count + if loc[1]["mode"] == "Secondary": + secondary_count += 1 + else: + attached_count += 1 + assert attached_count == split_shard_count + assert secondary_count == split_shard_count def finish_split(): # Having failed+rolled back, we should be able to split again