From 5fa747e493bbbcc6878c03742c5a63622ec31165 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 15 Feb 2024 08:21:53 +0000 Subject: [PATCH] pageserver: shard splitting refinements (parent deletion, hard linking) (#6725) ## Problem - We weren't deleting parent shard contents once the split was done - Re-downloading layers into child shards is wasteful ## Summary of changes - Hard-link layers into child chart local storage during split - Delete parent shards content at the end --------- Co-authored-by: Joonas Koivunen --- pageserver/src/tenant/mgr.rs | 154 ++++++++++++++++++++++++++- test_runner/regress/test_sharding.py | 15 +++ 2 files changed, 165 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 9aee39bd35..7260080720 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2,6 +2,7 @@ //! page server. use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; +use futures::stream::StreamExt; use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::models::ShardParameters; @@ -1439,8 +1440,10 @@ impl TenantManager { } }; - // TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download - // TODO: erase the dentries from the parent + // Optimization: hardlink layers from the parent into the children, so that they don't have to + // re-download & duplicate the data referenced in their initial IndexPart + self.shard_split_hardlink(parent, child_shards.clone()) + .await?; // Take a snapshot of where the parent's WAL ingest had got to: we will wait for // child shards to reach this point. @@ -1479,10 +1482,11 @@ impl TenantManager { // Phase 4: wait for child chards WAL ingest to catch up to target LSN for child_shard_id in &child_shards { + let child_shard_id = *child_shard_id; let child_shard = { let locked = TENANTS.read().unwrap(); let peek_slot = - tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?; + tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?; peek_slot.and_then(|s| s.get_attached()).cloned() }; if let Some(t) = child_shard { @@ -1517,7 +1521,7 @@ impl TenantManager { } } - // Phase 5: Shut down the parent shard. + // Phase 5: Shut down the parent shard, and erase it from disk let (_guard, progress) = completion::channel(); match parent.shutdown(progress, false).await { Ok(()) => {} @@ -1525,6 +1529,24 @@ impl TenantManager { other.wait().await; } } + let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id); + let tmp_path = safe_rename_tenant_dir(&local_tenant_directory) + .await + .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?; + task_mgr::spawn( + task_mgr::BACKGROUND_RUNTIME.handle(), + TaskKind::MgmtRequest, + None, + None, + "tenant_files_delete", + false, + async move { + fs::remove_dir_all(tmp_path.as_path()) + .await + .with_context(|| format!("tenant directory {:?} deletion", tmp_path)) + }, + ); + parent_slot_guard.drop_old_value()?; // Phase 6: Release the InProgress on the parent shard @@ -1532,6 +1554,130 @@ impl TenantManager { Ok(child_shards) } + + /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization + /// to avoid the children downloading them again. + /// + /// For each resident layer in the parent shard, we will hard link it into all of the child shards. + async fn shard_split_hardlink( + &self, + parent_shard: &Tenant, + child_shards: Vec, + ) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_id(); + + let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id()); + let (parent_timelines, parent_layers) = { + let mut parent_layers = Vec::new(); + let timelines = parent_shard.timelines.lock().unwrap().clone(); + let parent_timelines = timelines.keys().cloned().collect::>(); + for timeline in timelines.values() { + let timeline_layers = timeline + .layers + .read() + .await + .resident_layers() + .collect::>() + .await; + for layer in timeline_layers { + let relative_path = layer + .local_path() + .strip_prefix(&parent_path) + .context("Removing prefix from parent layer path")?; + parent_layers.push(relative_path.to_owned()); + } + } + debug_assert!( + !parent_layers.is_empty(), + "shutdown cannot empty the layermap" + ); + (parent_timelines, parent_layers) + }; + + let mut child_prefixes = Vec::new(); + let mut create_dirs = Vec::new(); + + for child in child_shards { + let child_prefix = self.conf.tenant_path(&child); + create_dirs.push(child_prefix.clone()); + create_dirs.extend( + parent_timelines + .iter() + .map(|t| self.conf.timeline_path(&child, t)), + ); + + child_prefixes.push(child_prefix); + } + + // Since we will do a large number of small filesystem metadata operations, batch them into + // spawn_blocking calls rather than doing each one as a tokio::fs round-trip. + let jh = tokio::task::spawn_blocking(move || -> anyhow::Result { + for dir in &create_dirs { + if let Err(e) = std::fs::create_dir_all(dir) { + // Ignore AlreadyExists errors, drop out on all other errors + match e.kind() { + std::io::ErrorKind::AlreadyExists => {} + _ => { + return Err(anyhow::anyhow!(e).context(format!("Creating {dir}"))); + } + } + } + } + + for child_prefix in child_prefixes { + for relative_layer in &parent_layers { + let parent_path = parent_path.join(relative_layer); + let child_path = child_prefix.join(relative_layer); + if let Err(e) = std::fs::hard_link(&parent_path, &child_path) { + match e.kind() { + std::io::ErrorKind::AlreadyExists => {} + std::io::ErrorKind::NotFound => { + tracing::info!( + "Layer {} not found during hard-linking, evicted during split?", + relative_layer + ); + } + _ => { + return Err(anyhow::anyhow!(e).context(format!( + "Hard linking {relative_layer} into {child_prefix}" + ))) + } + } + } + } + } + + // Durability is not required for correctness, but if we crashed during split and + // then came restarted with empty timeline dirs, it would be very inefficient to + // re-populate from remote storage. + for dir in create_dirs { + if let Err(e) = crashsafe::fsync(&dir) { + // Something removed a newly created timeline dir out from underneath us? Extremely + // unexpected, but not worth panic'ing over as this whole function is just an + // optimization. + tracing::warn!("Failed to fsync directory {dir}: {e}") + } + } + + Ok(parent_layers.len()) + }); + + match jh.await { + Ok(Ok(layer_count)) => { + tracing::info!(count = layer_count, "Hard linked layers into child shards"); + } + Ok(Err(e)) => { + // This is an optimization, so we tolerate failure. + tracing::warn!("Error hard-linking layers, proceeding anyway: {e}") + } + Err(e) => { + // This is something totally unexpected like a panic, so bail out. + anyhow::bail!("Error joining hard linking task: {e}"); + } + } + + Ok(()) + } } #[derive(Debug, thiserror::Error)] diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index fa40219d0e..fcf4b9f72a 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -194,6 +194,18 @@ def test_sharding_split_smoke( assert len(pre_split_pageserver_ids) == 4 + def shards_on_disk(shard_ids): + for pageserver in env.pageservers: + for shard_id in shard_ids: + if pageserver.tenant_dir(shard_id).exists(): + return True + + return False + + old_shard_ids = [TenantShardId(tenant_id, i, shard_count) for i in range(0, shard_count)] + # Before split, old shards exist + assert shards_on_disk(old_shard_ids) + env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count) post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)] @@ -202,6 +214,9 @@ def test_sharding_split_smoke( assert len(set(post_split_pageserver_ids)) == shard_count assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids) + # The old parent shards should no longer exist on disk + assert not shards_on_disk(old_shard_ids) + workload.validate() workload.churn_rows(256)