mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 03:50:37 +00:00
pageserver: shard splitting refinements (parent deletion, hard linking) (#6725)
## Problem - We weren't deleting parent shard contents once the split was done - Re-downloading layers into child shards is wasteful ## Summary of changes - Hard-link layers into child chart local storage during split - Delete parent shards content at the end --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
//! page server.
|
||||
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use futures::stream::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
@@ -1439,8 +1440,10 @@ impl TenantManager {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
|
||||
// TODO: erase the dentries from the parent
|
||||
// Optimization: hardlink layers from the parent into the children, so that they don't have to
|
||||
// re-download & duplicate the data referenced in their initial IndexPart
|
||||
self.shard_split_hardlink(parent, child_shards.clone())
|
||||
.await?;
|
||||
|
||||
// Take a snapshot of where the parent's WAL ingest had got to: we will wait for
|
||||
// child shards to reach this point.
|
||||
@@ -1479,10 +1482,11 @@ impl TenantManager {
|
||||
|
||||
// Phase 4: wait for child chards WAL ingest to catch up to target LSN
|
||||
for child_shard_id in &child_shards {
|
||||
let child_shard_id = *child_shard_id;
|
||||
let child_shard = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?;
|
||||
tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
|
||||
peek_slot.and_then(|s| s.get_attached()).cloned()
|
||||
};
|
||||
if let Some(t) = child_shard {
|
||||
@@ -1517,7 +1521,7 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 5: Shut down the parent shard.
|
||||
// Phase 5: Shut down the parent shard, and erase it from disk
|
||||
let (_guard, progress) = completion::channel();
|
||||
match parent.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
@@ -1525,6 +1529,24 @@ impl TenantManager {
|
||||
other.wait().await;
|
||||
}
|
||||
}
|
||||
let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
|
||||
let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MgmtRequest,
|
||||
None,
|
||||
None,
|
||||
"tenant_files_delete",
|
||||
false,
|
||||
async move {
|
||||
fs::remove_dir_all(tmp_path.as_path())
|
||||
.await
|
||||
.with_context(|| format!("tenant directory {:?} deletion", tmp_path))
|
||||
},
|
||||
);
|
||||
|
||||
parent_slot_guard.drop_old_value()?;
|
||||
|
||||
// Phase 6: Release the InProgress on the parent shard
|
||||
@@ -1532,6 +1554,130 @@ impl TenantManager {
|
||||
|
||||
Ok(child_shards)
|
||||
}
|
||||
|
||||
/// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
|
||||
/// to avoid the children downloading them again.
|
||||
///
|
||||
/// For each resident layer in the parent shard, we will hard link it into all of the child shards.
|
||||
async fn shard_split_hardlink(
|
||||
&self,
|
||||
parent_shard: &Tenant,
|
||||
child_shards: Vec<TenantShardId>,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
|
||||
let (parent_timelines, parent_layers) = {
|
||||
let mut parent_layers = Vec::new();
|
||||
let timelines = parent_shard.timelines.lock().unwrap().clone();
|
||||
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
|
||||
for timeline in timelines.values() {
|
||||
let timeline_layers = timeline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.resident_layers()
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
for layer in timeline_layers {
|
||||
let relative_path = layer
|
||||
.local_path()
|
||||
.strip_prefix(&parent_path)
|
||||
.context("Removing prefix from parent layer path")?;
|
||||
parent_layers.push(relative_path.to_owned());
|
||||
}
|
||||
}
|
||||
debug_assert!(
|
||||
!parent_layers.is_empty(),
|
||||
"shutdown cannot empty the layermap"
|
||||
);
|
||||
(parent_timelines, parent_layers)
|
||||
};
|
||||
|
||||
let mut child_prefixes = Vec::new();
|
||||
let mut create_dirs = Vec::new();
|
||||
|
||||
for child in child_shards {
|
||||
let child_prefix = self.conf.tenant_path(&child);
|
||||
create_dirs.push(child_prefix.clone());
|
||||
create_dirs.extend(
|
||||
parent_timelines
|
||||
.iter()
|
||||
.map(|t| self.conf.timeline_path(&child, t)),
|
||||
);
|
||||
|
||||
child_prefixes.push(child_prefix);
|
||||
}
|
||||
|
||||
// Since we will do a large number of small filesystem metadata operations, batch them into
|
||||
// spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
|
||||
let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
|
||||
for dir in &create_dirs {
|
||||
if let Err(e) = std::fs::create_dir_all(dir) {
|
||||
// Ignore AlreadyExists errors, drop out on all other errors
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::AlreadyExists => {}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for child_prefix in child_prefixes {
|
||||
for relative_layer in &parent_layers {
|
||||
let parent_path = parent_path.join(relative_layer);
|
||||
let child_path = child_prefix.join(relative_layer);
|
||||
if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::AlreadyExists => {}
|
||||
std::io::ErrorKind::NotFound => {
|
||||
tracing::info!(
|
||||
"Layer {} not found during hard-linking, evicted during split?",
|
||||
relative_layer
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e).context(format!(
|
||||
"Hard linking {relative_layer} into {child_prefix}"
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Durability is not required for correctness, but if we crashed during split and
|
||||
// then came restarted with empty timeline dirs, it would be very inefficient to
|
||||
// re-populate from remote storage.
|
||||
for dir in create_dirs {
|
||||
if let Err(e) = crashsafe::fsync(&dir) {
|
||||
// Something removed a newly created timeline dir out from underneath us? Extremely
|
||||
// unexpected, but not worth panic'ing over as this whole function is just an
|
||||
// optimization.
|
||||
tracing::warn!("Failed to fsync directory {dir}: {e}")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(parent_layers.len())
|
||||
});
|
||||
|
||||
match jh.await {
|
||||
Ok(Ok(layer_count)) => {
|
||||
tracing::info!(count = layer_count, "Hard linked layers into child shards");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// This is an optimization, so we tolerate failure.
|
||||
tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
|
||||
}
|
||||
Err(e) => {
|
||||
// This is something totally unexpected like a panic, so bail out.
|
||||
anyhow::bail!("Error joining hard linking task: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
||||
@@ -194,6 +194,18 @@ def test_sharding_split_smoke(
|
||||
|
||||
assert len(pre_split_pageserver_ids) == 4
|
||||
|
||||
def shards_on_disk(shard_ids):
|
||||
for pageserver in env.pageservers:
|
||||
for shard_id in shard_ids:
|
||||
if pageserver.tenant_dir(shard_id).exists():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
old_shard_ids = [TenantShardId(tenant_id, i, shard_count) for i in range(0, shard_count)]
|
||||
# Before split, old shards exist
|
||||
assert shards_on_disk(old_shard_ids)
|
||||
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
@@ -202,6 +214,9 @@ def test_sharding_split_smoke(
|
||||
assert len(set(post_split_pageserver_ids)) == shard_count
|
||||
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
||||
|
||||
# The old parent shards should no longer exist on disk
|
||||
assert not shards_on_disk(old_shard_ids)
|
||||
|
||||
workload.validate()
|
||||
|
||||
workload.churn_rows(256)
|
||||
|
||||
Reference in New Issue
Block a user