pageserver: don't delete parent shard files until split is committed (#12146)

## Problem

If a shard split fails and must roll back, the tenant may hit a cold
start as the parent shard's files have already been removed from local
disk.

External contribution with minor adjustments, see
https://neondb.slack.com/archives/C08TE3203RQ/p1748246398269309.

## Summary of changes

Keep the parent shard's files on local disk until the split has been
committed, such that they are available if the spilt is rolled back. If
all else fails, the files will be removed on the next Pageserver
restart.

This should also be fine in a mixed version:

* New storcon, old Pageserver: the Pageserver will delete the files
during the split, storcon will log an error when the cleanup detach
fails.

* Old storcon, new Pageserver: the Pageserver will leave the parent's
files around until the next Pageserver restart.

The change looks good to me, but shard splits are delicate so I'd like
some extra eyes on this.
This commit is contained in:
Erik Grinaker
2025-06-06 17:55:14 +02:00
committed by GitHub
parent 6dd84041a1
commit 3c7235669a
5 changed files with 146 additions and 17 deletions

View File

@@ -1671,7 +1671,12 @@ impl TenantManager {
}
}
// Phase 5: Shut down the parent shard, and erase it from disk
// Phase 5: Shut down the parent shard. We leave it on disk in case the split fails and we
// have to roll back to the parent shard, avoiding a cold start. It will be cleaned up once
// the storage controller commits the split, or if all else fails, on the next restart.
//
// TODO: We don't flush the ephemeral layer here, because the split is likely to succeed and
// catching up the parent should be reasonably quick. Consider using FreezeAndFlush instead.
let (_guard, progress) = completion::channel();
match parent.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {}
@@ -1679,11 +1684,6 @@ impl TenantManager {
other.wait().await;
}
}
let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
self.background_purges.spawn(tmp_path);
fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
"failpoint"
@@ -1846,42 +1846,70 @@ impl TenantManager {
shutdown_all_tenants0(self.tenants).await
}
/// Detaches a tenant, and removes its local files asynchronously.
///
/// File removal is idempotent: even if the tenant has already been removed, this will still
/// remove any local files. This is used during shard splits, where we leave the parent shard's
/// files around in case we have to roll back the split.
pub(crate) async fn detach_tenant(
&self,
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = self
if let Some(tmp_path) = self
.detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
.await?;
self.background_purges.spawn(tmp_path);
.await?
{
self.background_purges.spawn(tmp_path);
}
Ok(())
}
/// Detaches a tenant. This renames the tenant directory to a temporary path and returns it,
/// allowing the caller to delete it asynchronously. Returns None if the dir is already removed.
async fn detach_tenant0(
&self,
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
) -> Result<Option<Utf8PathBuf>, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
if !tokio::fs::try_exists(&local_tenant_directory).await? {
// If the tenant directory doesn't exist, it's already cleaned up.
return Ok(None);
}
safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| {
format!("local tenant directory {local_tenant_directory:?} rename")
})
.map(Some)
};
let removal_result = remove_tenant_from_memory(
let mut removal_result = remove_tenant_from_memory(
self.tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// If the tenant was not found, it was likely already removed. Attempt to remove the tenant
// directory on disk anyway. For example, during shard splits, we shut down and remove the
// parent shard, but leave its directory on disk in case we have to roll back the split.
//
// TODO: it would be better to leave the parent shard attached until the split is committed.
// This will be needed by the gRPC page service too, such that a compute can continue to
// read from the parent shard until it's notified about the new child shards. See:
// <https://github.com/neondatabase/neon/issues/11728>.
if let Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) = removal_result {
removal_result = tenant_dir_rename_operation(tenant_shard_id)
.await
.map_err(TenantStateError::Other);
}
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();

View File

@@ -1055,8 +1055,8 @@ pub(crate) enum WaitLsnWaiter<'a> {
/// Argument to [`Timeline::shutdown`].
#[derive(Debug, Clone, Copy)]
pub(crate) enum ShutdownMode {
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk. This method can
/// take multiple seconds for a busy timeline.
///
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
/// the call to [`Timeline::shutdown`].

View File

@@ -1108,7 +1108,8 @@ impl Service {
observed
}
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
/// Used during [`Self::startup_reconcile`] and shard splits: detach a list of unknown-to-us
/// tenants from pageservers.
///
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
/// tenants, then it is probably something incompletely deleted before: we will not fight with any
@@ -6211,7 +6212,11 @@ impl Service {
}
}
pausable_failpoint!("shard-split-pre-complete");
fail::fail_point!("shard-split-pre-complete", |_| Err(ApiError::Conflict(
"failpoint".to_string()
)));
pausable_failpoint!("shard-split-pre-complete-pause");
// TODO: if the pageserver restarted concurrently with our split API call,
// the actual generation of the child shard might differ from the generation
@@ -6233,6 +6238,15 @@ impl Service {
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Notify all page servers to detach and clean up the old shards because they will no longer
// be needed. This is best-effort: if it fails, it will be cleaned up on a subsequent
// Pageserver re-attach/startup.
let shards_to_cleanup = targets
.iter()
.map(|target| (target.parent_id, target.node.get_id()))
.collect();
self.cleanup_locations(shards_to_cleanup).await;
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {

View File

@@ -1836,3 +1836,90 @@ def test_sharding_gc(
shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}")
assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn
def test_split_ps_delete_old_shard_after_commit(neon_env_builder: NeonEnvBuilder):
"""
Check that PageServer only deletes old shards after the split is committed such that it doesn't
have to download a lot of files during abort.
"""
DBNAME = "regression"
init_shard_count = 4
neon_env_builder.num_pageservers = init_shard_count
stripe_size = 32
env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count, initial_tenant_shard_stripe_size=stripe_size
)
env.storage_controller.allowed_errors.extend(
[
# All split failures log a warning when they enqueue the abort operation
".*Enqueuing background abort.*",
# Tolerate any error logs that mention a failpoint
".*failpoint.*",
]
)
endpoint = env.endpoints.create("main")
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# Write some initial data.
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
for _ in range(1000):
endpoint.safe_psql(
"INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
)
# Record how many bytes we've downloaded before the split.
def collect_downloaded_bytes() -> list[float | None]:
downloaded_bytes = []
for page_server in env.pageservers:
metric = page_server.http_client().get_metric_value(
"pageserver_remote_ondemand_downloaded_bytes_total"
)
downloaded_bytes.append(metric)
return downloaded_bytes
downloaded_bytes_before = collect_downloaded_bytes()
# Attempt to split the tenant, but fail the split before it completes.
env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)"))
with pytest.raises(StorageControllerApiException):
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=16)
# Wait until split is aborted.
def check_split_is_aborted():
tenants = env.storage_controller.tenant_list()
assert len(tenants) == 1
shards = tenants[0]["shards"]
assert len(shards) == 4
for shard in shards:
assert not shard["is_splitting"]
assert not shard["is_reconciling"]
# Make sure all new shards have been deleted.
valid_shards = 0
for ps in env.pageservers:
for tenant_dir in os.listdir(ps.workdir / "tenants"):
try:
tenant_shard_id = TenantShardId.parse(tenant_dir)
valid_shards += 1
assert tenant_shard_id.shard_count == 4
except ValueError:
log.info(f"{tenant_dir} is not valid tenant shard id")
assert valid_shards >= 4
wait_until(check_split_is_aborted)
endpoint.safe_psql("SELECT count(*) from usertable;", log_query=False)
# Make sure we didn't download anything following the aborted split.
downloaded_bytes_after = collect_downloaded_bytes()
assert downloaded_bytes_before == downloaded_bytes_after
endpoint.stop_and_destroy()

View File

@@ -2956,7 +2956,7 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.allowed_errors.extend(
[".*Unexpected child shard count.*", ".*Enqueuing background abort.*"]
)
pause_failpoint = "shard-split-pre-complete"
pause_failpoint = "shard-split-pre-complete-pause"
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
split_fut = executor.submit(
@@ -3003,7 +3003,7 @@ def test_storage_controller_leadership_transfer_during_split(
env.storage_controller.request(
"PUT",
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
json=[{"name": pause_failpoint, "actions": "off"}],
headers=env.storage_controller.headers(TokenScope.ADMIN),
)