s3_scrubber: add pageserver-physical-gc (#7925)

## Problem

Currently, we leave `index_part.json` objects from old generations
behind each time a pageserver restarts or a tenant is migrated. This
doesn't break anything, but it's annoying when a tenant has been around
for a long time and starts to accumulate 10s-100s of these.

Partially implements: #7043 

## Summary of changes

- Add a new `pageserver-physical-gc` command to `s3_scrubber`

The name is a bit of a mouthful, but I think it makes sense:
- GC is the accurate term for what we are doing here: removing data that
takes up storage but can never be accessed.
- "physical" is a necessary distinction from the "normal" GC that we do
online in the pageserver, which operates at a higher level in terms of
LSNs+layers, whereas this type of GC is purely about S3 objects.
- "pageserver" makes clear that this command deals exclusively with
pageserver data, not safekeeper.
This commit is contained in:
John Spray
2024-06-03 17:16:23 +01:00
committed by GitHub
parent acf0a11fea
commit 69d18d6429
9 changed files with 387 additions and 37 deletions

1
Cargo.lock generated
View File

@@ -5129,6 +5129,7 @@ dependencies = [
"futures-util",
"hex",
"histogram",
"humantime",
"itertools",
"once_cell",
"pageserver",

View File

@@ -11,6 +11,7 @@ either.workspace = true
tokio-rustls.workspace = true
anyhow.workspace = true
hex.workspace = true
humantime.workspace = true
thiserror.workspace = true
rand.workspace = true
bytes.workspace = true

View File

@@ -1,7 +1,7 @@
use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use aws_sdk_s3::Client;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use tracing::{error, info, warn};
@@ -70,7 +70,7 @@ pub(crate) fn branch_cleanup_and_check_errors(
match s3_data {
Some(s3_data) => {
result.garbage_keys.extend(s3_data.keys_to_remove);
result.garbage_keys.extend(s3_data.unknown_keys);
match s3_data.blob_data {
BlobDataParseResult::Parsed {
@@ -240,7 +240,12 @@ impl TenantObjectListing {
#[derive(Debug)]
pub(crate) struct S3TimelineBlobData {
pub(crate) blob_data: BlobDataParseResult,
pub(crate) keys_to_remove: Vec<String>,
// Index objects that were not used when loading `blob_data`, e.g. those from old generations
pub(crate) unused_index_keys: Vec<String>,
// Objects whose keys were not recognized at all, i.e. not layer files, not indices
pub(crate) unknown_keys: Vec<String>,
}
#[derive(Debug)]
@@ -276,12 +281,12 @@ pub(crate) async fn list_timeline_blobs(
let mut s3_layers = HashSet::new();
let mut errors = Vec::new();
let mut keys_to_remove = Vec::new();
let mut unknown_keys = Vec::new();
let mut timeline_dir_target = s3_root.timeline_root(&id);
timeline_dir_target.delimiter = String::new();
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
let mut index_part_keys: Vec<String> = Vec::new();
let mut initdb_archive: bool = false;
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
@@ -292,16 +297,16 @@ pub(crate) async fn list_timeline_blobs(
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::info!("Index key {key}");
index_parts.push(obj)
tracing::debug!("Index key {key}");
index_part_keys.push(key.to_owned())
}
Some("initdb.tar.zst") => {
tracing::info!("initdb archive {key}");
tracing::debug!("initdb archive {key}");
initdb_archive = true;
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
s3_layers.insert((new_layer, gen));
}
Err(e) => {
@@ -309,37 +314,37 @@ pub(crate) async fn list_timeline_blobs(
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
keys_to_remove.push(key.to_string());
unknown_keys.push(key.to_string());
}
},
None => {
tracing::info!("Peculiar key {}", key);
tracing::warn!("Unknown key {}", key);
errors.push(format!("S3 list response got an object with odd key {key}"));
keys_to_remove.push(key.to_string());
unknown_keys.push(key.to_string());
}
}
}
if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
tracing::info!(
if index_part_keys.is_empty() && s3_layers.is_empty() && initdb_archive {
tracing::debug!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Relic,
keys_to_remove: Vec::new(),
unused_index_keys: index_part_keys,
unknown_keys: Vec::new(),
});
}
// Choose the index_part with the highest generation
let (index_part_object, index_part_generation) = match index_parts
let (index_part_object, index_part_generation) = match index_part_keys
.iter()
.filter_map(|k| {
let key = k.key();
.filter_map(|key| {
// Stripping the index key to the last part, because RemotePath doesn't
// like absolute paths, and depending on prefix_in_bucket it's possible
// for the keys we read back to start with a slash.
let basename = key.rsplit_once('/').unwrap().1;
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (k, g))
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
})
.max_by_key(|i| i.1)
.map(|(k, g)| (k.clone(), g))
@@ -347,15 +352,18 @@ pub(crate) async fn list_timeline_blobs(
Some((key, gen)) => (Some(key), gen),
None => {
// Legacy/missing case: one or zero index parts, which did not have a generation
(index_parts.pop(), Generation::none())
(index_part_keys.pop(), Generation::none())
}
};
if index_part_object.is_none() {
errors.push("S3 list response got no index_part.json file".to_string());
match index_part_object.as_ref() {
Some(selected) => index_part_keys.retain(|k| k != selected),
None => {
errors.push("S3 list response got no index_part.json file".to_string());
}
}
if let Some(index_part_object_key) = index_part_object.as_ref().map(|object| object.key()) {
if let Some(index_part_object_key) = index_part_object.as_ref() {
let index_part_bytes = download_object_with_retries(
s3_client,
&timeline_dir_target.bucket_name,
@@ -372,17 +380,14 @@ pub(crate) async fn list_timeline_blobs(
index_part_generation,
s3_layers,
},
keys_to_remove,
unused_index_keys: index_part_keys,
unknown_keys,
})
}
Err(index_parse_error) => errors.push(format!(
"index_part.json body parsing error: {index_parse_error}"
)),
}
} else {
errors.push(format!(
"Index part object {index_part_object:?} has no key"
));
}
if errors.is_empty() {
@@ -393,6 +398,7 @@ pub(crate) async fn list_timeline_blobs(
Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Incorrect(errors),
keys_to_remove,
unused_index_keys: index_part_keys,
unknown_keys,
})
}

View File

@@ -4,6 +4,7 @@ pub mod checks;
pub mod cloud_admin_api;
pub mod garbage;
pub mod metadata_stream;
pub mod pageserver_physical_gc;
pub mod scan_pageserver_metadata;
pub mod scan_safekeeper_metadata;
pub mod tenant_snapshot;
@@ -396,7 +397,7 @@ async fn download_object_with_retries(
.await
{
Ok(bytes_read) => {
tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
return Ok(body_buf);
}
Err(e) => {

View File

@@ -2,11 +2,13 @@ use anyhow::bail;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::pageserver_physical_gc::GcMode;
use s3_scrubber::scan_pageserver_metadata::scan_metadata;
use s3_scrubber::tenant_snapshot::SnapshotDownloader;
use s3_scrubber::{
init_logging, scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig,
NodeKind, TraversingDepth,
init_logging, pageserver_physical_gc::pageserver_physical_gc,
scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind,
TraversingDepth,
};
use clap::{Parser, Subcommand};
@@ -62,6 +64,14 @@ enum Command {
#[arg(short, long)]
output_path: Utf8PathBuf,
},
PageserverPhysicalGc {
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
#[arg(long = "min-age")]
min_age: humantime::Duration,
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
mode: GcMode,
},
}
#[tokio::main]
@@ -75,6 +85,7 @@ async fn main() -> anyhow::Result<()> {
Command::FindGarbage { .. } => "find-garbage",
Command::PurgeGarbage { .. } => "purge-garbage",
Command::TenantSnapshot { .. } => "tenant-snapshot",
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
@@ -178,5 +189,15 @@ async fn main() -> anyhow::Result<()> {
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
downloader.download().await
}
Command::PageserverPhysicalGc {
tenant_ids,
min_age,
mode,
} => {
let summary =
pageserver_physical_gc(bucket_config, tenant_ids, min_age.into(), mode).await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
}
}

View File

@@ -0,0 +1,239 @@
use std::time::{Duration, UNIX_EPOCH};
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use remote_storage::RemotePath;
use serde::Serialize;
use tracing::{info_span, Instrument};
use utils::generation::Generation;
#[derive(Serialize, Default)]
pub struct GcSummary {
indices_deleted: usize,
remote_storage_errors: usize,
}
#[derive(clap::ValueEnum, Debug, Clone, Copy)]
pub enum GcMode {
// Delete nothing
DryRun,
// Enable only removing old-generation indices
IndicesOnly,
// Enable all forms of GC
// TODO: this will be used when shard split ancestor layer deletion is added
// All,
}
impl std::fmt::Display for GcMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
GcMode::DryRun => write!(f, "dry-run"),
GcMode::IndicesOnly => write!(f, "indices-only"),
}
}
}
async fn maybe_delete_index(
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
latest_gen: Generation,
key: &str,
mode: GcMode,
summary: &mut GcSummary,
) {
// Validation: we will only delete things that parse cleanly
let basename = key.rsplit_once('/').unwrap().1;
let candidate_generation =
match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
Some(g) => g,
None => {
if basename == IndexPart::FILE_NAME {
// A legacy pre-generation index
Generation::none()
} else {
// A strange key: we will not delete this because we don't understand it.
tracing::warn!("Bad index key");
return;
}
}
};
// Validation: we will only delete indices more than one generation old, to avoid interfering
// in typical migrations, even if they are very long running.
if candidate_generation >= latest_gen {
// This shouldn't happen: when we loaded metadata, it should have selected the latest
// generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
// with older generations.
tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
return;
} else if candidate_generation.next() == latest_gen {
// Skip deleting the latest-1th generation's index.
return;
}
// Validation: we will only delete indices after one week, so that during incidents we will have
// easy access to recent indices.
let age: Duration = match s3_client
.head_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(response) => match response.last_modified {
None => {
tracing::warn!("Missing last_modified");
summary.remote_storage_errors += 1;
return;
}
Some(last_modified) => {
let last_modified =
UNIX_EPOCH + Duration::from_secs_f64(last_modified.as_secs_f64());
match last_modified.elapsed() {
Ok(e) => e,
Err(_) => {
tracing::warn!("Bad last_modified time: {last_modified:?}");
return;
}
}
}
},
Err(e) => {
tracing::warn!("Failed to HEAD {key}: {e}");
summary.remote_storage_errors += 1;
return;
}
};
if &age < min_age {
tracing::info!(
"Skipping young object {} < {}",
age.as_secs_f64(),
min_age.as_secs_f64()
);
return;
}
if matches!(mode, GcMode::DryRun) {
tracing::info!("Dry run: would delete this key");
return;
}
// All validations passed: erase the object
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(_) => {
tracing::info!("Successfully deleted index");
summary.indices_deleted += 1;
}
Err(e) => {
tracing::warn!("Failed to delete index: {e}");
summary.remote_storage_errors += 1;
}
}
}
/// Physical garbage collection: removing unused S3 objects. This is distinct from the garbage collection
/// done inside the pageserver, which operates at a higher level (keys, layers). This type of garbage collection
/// is about removing:
/// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
/// uploading a layer and uploading an index)
/// - Index objects from historic generations
///
/// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
pub async fn pageserver_physical_gc(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn gc_timeline(
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
target: &RootTarget,
mode: GcMode,
ttid: TenantShardTimelineId,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(s3_client, ttid, target).await?;
let (latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation,
s3_layers: _s3_layers,
} => (*index_part_generation, data.unused_index_keys),
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}
BlobDataParseResult::Incorrect(reasons) => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
return Ok(summary);
}
};
for key in candidates {
maybe_delete_index(
s3_client,
bucket_config,
min_age,
latest_gen,
&key,
mode,
&mut summary,
)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
.await;
}
Ok(summary)
}
let timelines = timelines
.map_ok(|ttid| gc_timeline(&s3_client, &bucket_config, &min_age, &target, mode, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
let mut summary = GcSummary::default();
while let Some(i) = timelines.next().await {
let tl_summary = i?;
summary.indices_deleted += tl_summary.indices_deleted;
summary.remote_storage_errors += tl_summary.remote_storage_errors;
}
Ok(summary)
}

View File

@@ -3998,6 +3998,30 @@ class S3Scrubber:
)
log.info(f"tenant-snapshot output: {stdout}")
def pageserver_physical_gc(
self, min_age_secs: int, tenant_ids: Optional[list[TenantId]] = None
):
args = ["pageserver-physical-gc", "--min-age", f"{min_age_secs}s"]
if tenant_ids is None:
tenant_ids = []
for tenant_id in tenant_ids:
args.extend(["--tenant-id", str(tenant_id)])
stdout = self.scrubber_cli(
args,
timeout=30,
)
try:
return json.loads(stdout)
except:
log.error(
"Failed to decode JSON output from `pageserver-physical_gc`. Dumping stdout:"
)
log.error(stdout)
raise
def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path:
"""Compute the path to a working directory for an individual test."""

View File

@@ -15,7 +15,7 @@ from fixtures.pageserver.utils import (
tenant_delete_wait_completed,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, S3Storage, s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
@@ -73,7 +73,7 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
remote_storage_kind=s3_storage(),
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
@@ -215,6 +215,13 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
)
workload.validate(pageserver.id)
# Having done a bunch of attach/detach cycles, we will have generated some index garbage: check
# that the scrubber sees it and cleans it up. We do this before the final attach+validate pass,
# to also validate that the scrubber isn't breaking anything.
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] > 0
# Attach all pageservers
for ps in env.pageservers:
location_conf = {"mode": "AttachedMulti", "secondary_conf": None, "tenant_conf": {}}
@@ -227,10 +234,11 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
# Detach all pageservers
for ps in env.pageservers:
location_conf = {"mode": "Detached", "secondary_conf": None, "tenant_conf": {}}
assert ps.list_layers(tenant_id, timeline_id) != []
ps.tenant_location_configure(tenant_id, location_conf)
# Confirm that all local disk state was removed on detach
# TODO
# Confirm that all local disk state was removed on detach
assert ps.list_layers(tenant_id, timeline_id) == []
def test_live_migration(neon_env_builder: NeonEnvBuilder):

View File

@@ -3,7 +3,7 @@ import shutil
from typing import Optional
import pytest
from fixtures.common_types import TenantShardId
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.neon_fixtures import (
NeonEnvBuilder,
S3Scrubber,
@@ -109,3 +109,52 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
# Check we can read everything
workload.validate()
@pytest.mark.parametrize("shard_count", [None, 4])
def test_scrubber_physical_gc(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(tenant_id, timeline_id, shard_count=shard_count)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
# We will end up with an index per shard, per cycle, plus one for the initial startup
n_cycles = 4
expect_indices_per_shard = n_cycles + 1
shard_count = 1 if shard_count is None else shard_count
# For each cycle, detach and attach the tenant to bump the generation, and do some writes to generate uploads
for _i in range(0, n_cycles):
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.reconcile_until_idle()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
env.storage_controller.reconcile_until_idle()
# This write includes remote upload, will generate an index in this generation
workload.write_rows(1)
# With a high min_age, the scrubber should decline to delete anything
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=3600)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
# If targeting a different tenant, the scrubber shouldn't do anything
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(
min_age_secs=1, tenant_ids=[TenantId.generate()]
)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
# With a low min_age, the scrubber should go ahead and clean up all but the latest 2 generations
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == (expect_indices_per_shard - 2) * shard_count