diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 86aef9b42c..186e0f4cdb 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1671,7 +1671,12 @@ impl TenantManager { } } - // Phase 5: Shut down the parent shard, and erase it from disk + // Phase 5: Shut down the parent shard. We leave it on disk in case the split fails and we + // have to roll back to the parent shard, avoiding a cold start. It will be cleaned up once + // the storage controller commits the split, or if all else fails, on the next restart. + // + // TODO: We don't flush the ephemeral layer here, because the split is likely to succeed and + // catching up the parent should be reasonably quick. Consider using FreezeAndFlush instead. let (_guard, progress) = completion::channel(); match parent.shutdown(progress, ShutdownMode::Hard).await { Ok(()) => {} @@ -1679,11 +1684,6 @@ impl TenantManager { other.wait().await; } } - let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id); - let tmp_path = safe_rename_tenant_dir(&local_tenant_directory) - .await - .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?; - self.background_purges.spawn(tmp_path); fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!( "failpoint" @@ -1846,42 +1846,70 @@ impl TenantManager { shutdown_all_tenants0(self.tenants).await } + /// Detaches a tenant, and removes its local files asynchronously. + /// + /// File removal is idempotent: even if the tenant has already been removed, this will still + /// remove any local files. This is used during shard splits, where we leave the parent shard's + /// files around in case we have to roll back the split. pub(crate) async fn detach_tenant( &self, conf: &'static PageServerConf, tenant_shard_id: TenantShardId, deletion_queue_client: &DeletionQueueClient, ) -> Result<(), TenantStateError> { - let tmp_path = self + if let Some(tmp_path) = self .detach_tenant0(conf, tenant_shard_id, deletion_queue_client) - .await?; - self.background_purges.spawn(tmp_path); + .await? + { + self.background_purges.spawn(tmp_path); + } Ok(()) } + /// Detaches a tenant. This renames the tenant directory to a temporary path and returns it, + /// allowing the caller to delete it asynchronously. Returns None if the dir is already removed. async fn detach_tenant0( &self, conf: &'static PageServerConf, tenant_shard_id: TenantShardId, deletion_queue_client: &DeletionQueueClient, - ) -> Result { + ) -> Result, TenantStateError> { let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move { let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean); + if !tokio::fs::try_exists(&local_tenant_directory).await? { + // If the tenant directory doesn't exist, it's already cleaned up. + return Ok(None); + } safe_rename_tenant_dir(&local_tenant_directory) .await .with_context(|| { format!("local tenant directory {local_tenant_directory:?} rename") }) + .map(Some) }; - let removal_result = remove_tenant_from_memory( + let mut removal_result = remove_tenant_from_memory( self.tenants, tenant_shard_id, tenant_dir_rename_operation(tenant_shard_id), ) .await; + // If the tenant was not found, it was likely already removed. Attempt to remove the tenant + // directory on disk anyway. For example, during shard splits, we shut down and remove the + // parent shard, but leave its directory on disk in case we have to roll back the split. + // + // TODO: it would be better to leave the parent shard attached until the split is committed. + // This will be needed by the gRPC page service too, such that a compute can continue to + // read from the parent shard until it's notified about the new child shards. See: + // . + if let Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) = removal_result { + removal_result = tenant_dir_rename_operation(tenant_shard_id) + .await + .map_err(TenantStateError::Other); + } + // Flush pending deletions, so that they have a good chance of passing validation // before this tenant is potentially re-attached elsewhere. deletion_queue_client.flush_advisory(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3522af2de0..0ff005fbb9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1055,8 +1055,8 @@ pub(crate) enum WaitLsnWaiter<'a> { /// Argument to [`Timeline::shutdown`]. #[derive(Debug, Clone, Copy)] pub(crate) enum ShutdownMode { - /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then - /// also to remote storage. This method can easily take multiple seconds for a busy timeline. + /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk. This method can + /// take multiple seconds for a busy timeline. /// /// While we are flushing, we continue to accept read I/O for LSNs ingested before /// the call to [`Timeline::shutdown`]. diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index cb29993e8c..06318a01b5 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1108,7 +1108,8 @@ impl Service { observed } - /// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers. + /// Used during [`Self::startup_reconcile`] and shard splits: detach a list of unknown-to-us + /// tenants from pageservers. /// /// This is safe to run in the background, because if we don't have this TenantShardId in our map of /// tenants, then it is probably something incompletely deleted before: we will not fight with any @@ -6211,7 +6212,11 @@ impl Service { } } - pausable_failpoint!("shard-split-pre-complete"); + fail::fail_point!("shard-split-pre-complete", |_| Err(ApiError::Conflict( + "failpoint".to_string() + ))); + + pausable_failpoint!("shard-split-pre-complete-pause"); // TODO: if the pageserver restarted concurrently with our split API call, // the actual generation of the child shard might differ from the generation @@ -6233,6 +6238,15 @@ impl Service { let (response, child_locations, waiters) = self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size); + // Notify all page servers to detach and clean up the old shards because they will no longer + // be needed. This is best-effort: if it fails, it will be cleaned up on a subsequent + // Pageserver re-attach/startup. + let shards_to_cleanup = targets + .iter() + .map(|target| (target.parent_id, target.node.get_id())) + .collect(); + self.cleanup_locations(shards_to_cleanup).await; + // Send compute notifications for all the new shards let mut failed_notifications = Vec::new(); for (child_id, child_ps, stripe_size) in child_locations { diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 4c9887fb92..522e257ea5 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1836,3 +1836,90 @@ def test_sharding_gc( shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}") assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn + + +def test_split_ps_delete_old_shard_after_commit(neon_env_builder: NeonEnvBuilder): + """ + Check that PageServer only deletes old shards after the split is committed such that it doesn't + have to download a lot of files during abort. + """ + DBNAME = "regression" + + init_shard_count = 4 + neon_env_builder.num_pageservers = init_shard_count + stripe_size = 32 + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, initial_tenant_shard_stripe_size=stripe_size + ) + + env.storage_controller.allowed_errors.extend( + [ + # All split failures log a warning when they enqueue the abort operation + ".*Enqueuing background abort.*", + # Tolerate any error logs that mention a failpoint + ".*failpoint.*", + ] + ) + + endpoint = env.endpoints.create("main") + endpoint.respec(skip_pg_catalog_updates=False) + endpoint.start() + + # Write some initial data. + endpoint.safe_psql(f"CREATE DATABASE {DBNAME}") + endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);") + + for _ in range(1000): + endpoint.safe_psql( + "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False + ) + + # Record how many bytes we've downloaded before the split. + def collect_downloaded_bytes() -> list[float | None]: + downloaded_bytes = [] + for page_server in env.pageservers: + metric = page_server.http_client().get_metric_value( + "pageserver_remote_ondemand_downloaded_bytes_total" + ) + downloaded_bytes.append(metric) + return downloaded_bytes + + downloaded_bytes_before = collect_downloaded_bytes() + + # Attempt to split the tenant, but fail the split before it completes. + env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)")) + with pytest.raises(StorageControllerApiException): + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=16) + + # Wait until split is aborted. + def check_split_is_aborted(): + tenants = env.storage_controller.tenant_list() + assert len(tenants) == 1 + shards = tenants[0]["shards"] + assert len(shards) == 4 + for shard in shards: + assert not shard["is_splitting"] + assert not shard["is_reconciling"] + + # Make sure all new shards have been deleted. + valid_shards = 0 + for ps in env.pageservers: + for tenant_dir in os.listdir(ps.workdir / "tenants"): + try: + tenant_shard_id = TenantShardId.parse(tenant_dir) + valid_shards += 1 + assert tenant_shard_id.shard_count == 4 + except ValueError: + log.info(f"{tenant_dir} is not valid tenant shard id") + assert valid_shards >= 4 + + wait_until(check_split_is_aborted) + + endpoint.safe_psql("SELECT count(*) from usertable;", log_query=False) + + # Make sure we didn't download anything following the aborted split. + downloaded_bytes_after = collect_downloaded_bytes() + + assert downloaded_bytes_before == downloaded_bytes_after + endpoint.stop_and_destroy() diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 5e0dd780c3..8f3aa010e3 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2956,7 +2956,7 @@ def test_storage_controller_leadership_transfer_during_split( env.storage_controller.allowed_errors.extend( [".*Unexpected child shard count.*", ".*Enqueuing background abort.*"] ) - pause_failpoint = "shard-split-pre-complete" + pause_failpoint = "shard-split-pre-complete-pause" env.storage_controller.configure_failpoints((pause_failpoint, "pause")) split_fut = executor.submit( @@ -3003,7 +3003,7 @@ def test_storage_controller_leadership_transfer_during_split( env.storage_controller.request( "PUT", f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints", - json=[{"name": "shard-split-pre-complete", "actions": "off"}], + json=[{"name": pause_failpoint, "actions": "off"}], headers=env.storage_controller.headers(TokenScope.ADMIN), )