Refactor hard linking into one spawn_blocking

This commit is contained in:
John Spray
2024-02-14 12:09:15 +00:00
parent 3d6ad0c42e
commit fcc2eb88a1

View File

@@ -1577,33 +1577,39 @@ impl TenantManager {
}
(parent_timelines, parent_layers)
};
let mut child_prefixes = Vec::new();
let mut create_dirs = Vec::new();
for child in child_shards {
let child_prefix = self.conf.tenant_path(&child);
let mut create_dirs = vec![child_prefix.clone()];
create_dirs.push(child_prefix.clone());
create_dirs.extend(
parent_timelines
.iter()
.map(|t| self.conf.timeline_path(&child, t)),
);
let layers_clone = parent_layers.clone();
let parent_path = parent_path.clone();
// Since we will do a large number of small filesystem metadata operations, batch them into
// spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
for dir in create_dirs {
if let Err(e) = std::fs::create_dir_all(&dir) {
// Ignore AlreadyExists errors, drop out on all other errors
match e.kind() {
std::io::ErrorKind::AlreadyExists => {}
_ => {
return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
}
child_prefixes.push(child_prefix);
}
// Since we will do a large number of small filesystem metadata operations, batch them into
// spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
for dir in create_dirs {
if let Err(e) = std::fs::create_dir_all(&dir) {
// Ignore AlreadyExists errors, drop out on all other errors
match e.kind() {
std::io::ErrorKind::AlreadyExists => {}
_ => {
return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
}
}
}
}
for relative_layer in layers_clone {
for child_prefix in child_prefixes {
for relative_layer in &parent_layers {
let parent_path = parent_path.join(&relative_layer);
let child_path = child_prefix.join(&relative_layer);
if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
@@ -1617,32 +1623,28 @@ impl TenantManager {
}
_ => {
return Err(anyhow::anyhow!(e).context(format!(
"Hard linking {relative_layer} into {child}"
"Hard linking {relative_layer} into {child_prefix}"
)))
}
}
}
}
}
Ok(())
});
Ok(parent_layers.len())
});
match jh.await {
Ok(Ok(())) => {
tracing::info!(
"Linked {} layers into child shard {}",
parent_layers.len(),
child
);
}
Ok(Err(e)) => {
// This is an optimization, so we tolerate failure.
tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
}
Err(e) => {
// This is something totally unexpected like a panic, so bail out.
anyhow::bail!("Error joining hard linking task: {e}");
}
match jh.await {
Ok(Ok(layer_count)) => {
tracing::info!("Linked {layer_count} layers into child shards");
}
Ok(Err(e)) => {
// This is an optimization, so we tolerate failure.
tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
}
Err(e) => {
// This is something totally unexpected like a panic, so bail out.
anyhow::bail!("Error joining hard linking task: {e}");
}
}