storage scrubber: GC ancestor shard layers (#8196)

## Problem

After a shard split, the pageserver leaves the ancestor shard's content
in place. It may be referenced by child shards, but eventually child
shards will de-reference most ancestor layers as they write their own
data and do GC. We would like to eventually clean up those ancestor
layers to reclaim space.

## Summary of changes

- Extend the physical GC command with `--mode=full`, which includes
cleaning up unreferenced ancestor shard layers
- Add test `test_scrubber_physical_gc_ancestors`
- Remove colored log output: in testing this is irritating ANSI code
spam in logs, and in interactive use doesn't add much.
- Refactor storage controller API client code out of storcon_client into
a `storage_controller/client` crate
- During physical GC of ancestors, call into the storage controller to
check that the latest shards seen in S3 reflect the latest state of the
tenant, and there is no shard split in progress.
This commit is contained in:
John Spray
2024-07-19 17:07:59 +01:00
committed by GitHub
parent 16071e57c6
commit 44781518d0
24 changed files with 905 additions and 191 deletions

41
Cargo.lock generated
View File

@@ -3234,16 +3234,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -3539,12 +3529,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -5822,6 +5806,28 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "storage_controller_client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"futures",
"pageserver_api",
"pageserver_client",
"postgres",
"reqwest 0.12.4",
"serde",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-stream",
"tokio-util",
"utils",
"workspace_hack",
]
[[package]]
name = "storage_scrubber"
version = "0.1.0"
@@ -5856,6 +5862,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"storage_controller_client",
"thiserror",
"tokio",
"tokio-postgres",
@@ -5885,6 +5892,7 @@ dependencies = [
"reqwest 0.12.4",
"serde",
"serde_json",
"storage_controller_client",
"thiserror",
"tokio",
"tracing",
@@ -6611,7 +6619,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",

View File

@@ -13,6 +13,7 @@ members = [
"safekeeper",
"storage_broker",
"storage_controller",
"storage_controller/client",
"storage_scrubber",
"workspace_hack",
"libs/compute_api",
@@ -182,7 +183,7 @@ tower-service = "0.3.2"
tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = "2.2"
@@ -221,6 +222,7 @@ remote_storage = { version = "0.1", path = "./libs/remote_storage/" }
safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
desim = { version = "0.1", path = "./libs/desim" }
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_controller_client = { path = "./storage_controller/client" }
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }

View File

@@ -17,6 +17,7 @@ pageserver_client.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
storage_controller_client.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

View File

@@ -14,15 +14,15 @@ use pageserver_api::{
},
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use serde::{de::DeserializeOwned, Serialize};
use utils::id::{NodeId, TenantId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use storage_controller_client::control_api::Client;
#[derive(Subcommand, Debug)]
enum Command {
@@ -249,64 +249,6 @@ impl FromStr for NodeAvailabilityArg {
}
}
struct Client {
base_url: Url,
jwt_token: Option<String>,
client: reqwest::Client,
}
impl Client {
fn new(base_url: Url, jwt_token: Option<String>) -> Self {
Self {
base_url,
jwt_token,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
}
}
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch<RQ, RS>(
&self,
method: Method,
path: String,
body: Option<RQ>,
) -> mgmt_api::Result<RS>
where
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let url = Url::from_str(&format!(
"http://{}:{}/{path}",
self.base_url.host_str().unwrap(),
self.base_url.port().unwrap()
))
.unwrap();
let mut builder = self.client.request(method, url);
if let Some(body) = body {
builder = builder.json(&body)
}
if let Some(jwt_token) = &self.jwt_token {
builder = builder.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {jwt_token}"),
);
}
let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
let response = response.error_from_body().await?;
response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();

View File

@@ -87,7 +87,7 @@ pub struct TenantLocateResponse {
pub shard_params: ShardParameters,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantDescribeResponse {
pub tenant_id: TenantId,
pub shards: Vec<TenantDescribeResponseShard>,
@@ -110,7 +110,7 @@ pub struct NodeDescribeResponse {
pub listen_pg_port: u16,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantDescribeResponseShard {
pub tenant_shard_id: TenantShardId,

View File

@@ -33,6 +33,10 @@ pub enum Scope {
GenerationsApi,
// Allows access to control plane managment API and some storage controller endpoints.
Admin,
/// Allows access to storage controller APIs used by the scrubber, to interrogate the state
/// of a tenant & post scrub results.
Scrubber,
}
/// JWT payload. See docs/authentication.md for the format

View File

@@ -14,12 +14,14 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
}
(Scope::PageServerApi, None) => Ok(()), // access to management api for PageServerApi scope
(Scope::PageServerApi, Some(_)) => Ok(()), // access to tenant api using PageServerApi scope
(Scope::Admin | Scope::SafekeeperData | Scope::GenerationsApi, _) => Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Pageserver auth",
claims.scope
)
.into(),
)),
(Scope::Admin | Scope::SafekeeperData | Scope::GenerationsApi | Scope::Scrubber, _) => {
Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Pageserver auth",
claims.scope
)
.into(),
))
}
}
}

View File

@@ -12,13 +12,15 @@ pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<
}
Ok(())
}
(Scope::Admin | Scope::PageServerApi | Scope::GenerationsApi, _) => Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Safekeeper auth",
claims.scope
)
.into(),
)),
(Scope::Admin | Scope::PageServerApi | Scope::GenerationsApi | Scope::Scrubber, _) => {
Err(AuthError(
format!(
"JWT scope '{:?}' is ineligible for Safekeeper auth",
claims.scope
)
.into(),
))
}
(Scope::SafekeeperData, _) => Ok(()),
}
}

View File

@@ -0,0 +1,23 @@
[package]
name = "storage_controller_client"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
pageserver_api.workspace = true
pageserver_client.workspace = true
thiserror.workspace = true
async-trait.workspace = true
reqwest.workspace = true
utils.workspace = true
serde.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio.workspace = true
futures.workspace = true
tokio-util.workspace = true
anyhow.workspace = true
postgres.workspace = true
bytes.workspace = true

View File

@@ -0,0 +1,62 @@
use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
use reqwest::{Method, Url};
use serde::{de::DeserializeOwned, Serialize};
use std::str::FromStr;
pub struct Client {
base_url: Url,
jwt_token: Option<String>,
client: reqwest::Client,
}
impl Client {
pub fn new(base_url: Url, jwt_token: Option<String>) -> Self {
Self {
base_url,
jwt_token,
client: reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client"),
}
}
/// Simple HTTP request wrapper for calling into storage controller
pub async fn dispatch<RQ, RS>(
&self,
method: Method,
path: String,
body: Option<RQ>,
) -> mgmt_api::Result<RS>
where
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let url = Url::from_str(&format!(
"http://{}:{}/{path}",
self.base_url.host_str().unwrap(),
self.base_url.port().unwrap()
))
.unwrap();
let mut builder = self.client.request(method, url);
if let Some(body) = body {
builder = builder.json(&body)
}
if let Some(jwt_token) = &self.jwt_token {
builder = builder.header(
reqwest::header::AUTHORIZATION,
format!("Bearer {jwt_token}"),
);
}
let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
let response = response.error_from_body().await?;
response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
}
}

View File

@@ -0,0 +1 @@
pub mod control_api;

View File

@@ -430,7 +430,7 @@ async fn handle_tenant_describe(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
check_permissions(&req, Scope::Scrubber)?;
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)

View File

@@ -5,6 +5,7 @@ use metrics::launch_timestamp::LaunchTimestamp;
use metrics::BuildInfo;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
@@ -310,12 +311,21 @@ async fn async_main() -> anyhow::Result<()> {
tracing::info!("Terminating on signal");
// Stop HTTP server first, so that we don't have to service requests
// while shutting down Service
// while shutting down Service.
server_shutdown.cancel();
if let Err(e) = server_task.await {
tracing::error!("Error joining HTTP server task: {e}")
match tokio::time::timeout(Duration::from_secs(5), server_task).await {
Ok(Ok(_)) => {
tracing::info!("Joined HTTP server task");
}
Ok(Err(e)) => {
tracing::error!("Error joining HTTP server task: {e}")
}
Err(_) => {
tracing::warn!("Timed out joining HTTP server task");
// We will fall through and shut down the service anyway, any request handlers
// in flight will experience cancellation & their clients will see a torn connection.
}
}
tracing::info!("Joined HTTP server task");
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -3956,6 +3956,8 @@ impl Service {
"failpoint".to_string()
)));
failpoint_support::sleep_millis_async!("shard-split-post-remote-sleep", &self.cancel);
tracing::info!(
"Split {} into {}",
parent_id,

View File

@@ -34,6 +34,7 @@ camino.workspace = true
rustls.workspace = true
rustls-native-certs.workspace = true
once_cell.workspace = true
storage_controller_client.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }

View File

@@ -24,6 +24,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use pageserver_api::shard::TenantShardId;
use remote_storage::RemotePath;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
@@ -31,7 +32,7 @@ use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use utils::fs_ext;
use utils::id::{TenantId, TimelineId};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
@@ -54,7 +55,7 @@ pub struct S3Target {
/// in the pageserver, as all timeline objects existing in the scope of a particular
/// tenant: the scrubber is different in that it handles collections of data referring to many
/// TenantShardTimelineIds in on place.
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TenantShardTimelineId {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
@@ -67,6 +68,10 @@ impl TenantShardTimelineId {
timeline_id,
}
}
fn as_tenant_timeline_id(&self) -> TenantTimelineId {
TenantTimelineId::new(self.tenant_shard_id.tenant_id, self.timeline_id)
}
}
impl Display for TenantShardTimelineId {
@@ -179,6 +184,22 @@ impl RootTarget {
.with_sub_segment(&id.timeline_id.to_string())
}
/// Given RemotePath "tenants/foo/timelines/bar/layerxyz", prefix it to a literal
/// key in the S3 bucket.
pub fn absolute_key(&self, key: &RemotePath) -> String {
let root = match self {
Self::Pageserver(root) => root,
Self::Safekeeper(root) => root,
};
let prefix = &root.prefix_in_bucket;
if prefix.ends_with('/') {
format!("{prefix}{key}")
} else {
format!("{prefix}/{key}")
}
}
pub fn bucket_name(&self) -> &str {
match self {
Self::Pageserver(root) => &root.bucket_name,
@@ -216,6 +237,14 @@ impl BucketConfig {
}
}
pub struct ControllerClientConfig {
/// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
pub controller_api: Url,
/// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
pub controller_jwt: String,
}
pub struct ConsoleConfig {
pub token: String,
pub base_url: Url,

View File

@@ -1,11 +1,12 @@
use anyhow::bail;
use anyhow::{anyhow, bail};
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use storage_scrubber::find_large_objects;
use reqwest::Url;
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use storage_scrubber::pageserver_physical_gc::GcMode;
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
use storage_scrubber::tenant_snapshot::SnapshotDownloader;
use storage_scrubber::{find_large_objects, ControllerClientConfig};
use storage_scrubber::{
init_logging, pageserver_physical_gc::pageserver_physical_gc,
scan_safekeeper_metadata::scan_safekeeper_metadata, BucketConfig, ConsoleConfig, NodeKind,
@@ -24,6 +25,14 @@ struct Cli {
#[arg(short, long, default_value_t = false)]
delete: bool,
#[arg(long)]
/// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
controller_api: Option<Url>,
#[arg(long)]
/// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
controller_jwt: Option<String>,
}
#[derive(Subcommand, Debug)]
@@ -204,8 +213,37 @@ async fn main() -> anyhow::Result<()> {
min_age,
mode,
} => {
let summary =
pageserver_physical_gc(bucket_config, tenant_ids, min_age.into(), mode).await?;
let controller_client_conf = cli.controller_api.map(|controller_api| {
ControllerClientConfig {
controller_api,
// Default to no key: this is a convenience when working in a development environment
controller_jwt: cli.controller_jwt.unwrap_or("".to_owned()),
}
});
match (&controller_client_conf, mode) {
(Some(_), _) => {
// Any mode may run when controller API is set
}
(None, GcMode::Full) => {
// The part of physical GC where we erase ancestor layers cannot be done safely without
// confirming the most recent complete shard split with the controller. Refuse to run, rather
// than doing it unsafely.
return Err(anyhow!("Full physical GC requires `--controller-api` and `--controller-jwt` to run"));
}
(None, GcMode::DryRun | GcMode::IndicesOnly) => {
// These GcModes do not require the controller to run.
}
}
let summary = pageserver_physical_gc(
bucket_config,
controller_client_conf,
tenant_ids,
min_age.into(),
mode,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}

View File

@@ -1,22 +1,50 @@
use std::time::{Duration, UNIX_EPOCH};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use crate::{
init_remote, BucketConfig, ControllerClientConfig, NodeKind, RootTarget, TenantShardTimelineId,
};
use aws_sdk_s3::Client;
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use pageserver_api::controller_api::TenantDescribeResponse;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use remote_storage::RemotePath;
use reqwest::Method;
use serde::Serialize;
use storage_controller_client::control_api;
use tracing::{info_span, Instrument};
use utils::generation::Generation;
use utils::id::{TenantId, TenantTimelineId};
#[derive(Serialize, Default)]
pub struct GcSummary {
indices_deleted: usize,
remote_storage_errors: usize,
controller_api_errors: usize,
ancestor_layers_deleted: usize,
}
impl GcSummary {
fn merge(&mut self, other: Self) {
let Self {
indices_deleted,
remote_storage_errors,
ancestor_layers_deleted,
controller_api_errors,
} = other;
self.indices_deleted += indices_deleted;
self.remote_storage_errors += remote_storage_errors;
self.ancestor_layers_deleted += ancestor_layers_deleted;
self.controller_api_errors += controller_api_errors;
}
}
#[derive(clap::ValueEnum, Debug, Clone, Copy)]
@@ -26,9 +54,9 @@ pub enum GcMode {
// Enable only removing old-generation indices
IndicesOnly,
// Enable all forms of GC
// TODO: this will be used when shard split ancestor layer deletion is added
// All,
Full,
}
impl std::fmt::Display for GcMode {
@@ -36,10 +64,232 @@ impl std::fmt::Display for GcMode {
match self {
GcMode::DryRun => write!(f, "dry-run"),
GcMode::IndicesOnly => write!(f, "indices-only"),
GcMode::Full => write!(f, "full"),
}
}
}
mod refs {
use super::*;
// Map of cross-shard layer references, giving a refcount for each layer in each shard that is referenced by some other
// shard in the same tenant. This is sparse! The vast majority of timelines will have no cross-shard refs, and those that
// do have cross shard refs should eventually drop most of them via compaction.
//
// In our inner map type, the TTID in the key is shard-agnostic, and the ShardIndex in the value refers to the _ancestor
// which is is referenced_.
#[derive(Default)]
pub(super) struct AncestorRefs(
BTreeMap<TenantTimelineId, HashMap<(ShardIndex, LayerName), usize>>,
);
impl AncestorRefs {
/// Insert references for layers discovered in a particular shard-timeline that refer to an ancestral shard-timeline.
pub(super) fn update(
&mut self,
ttid: TenantShardTimelineId,
layers: Vec<(LayerName, LayerFileMetadata)>,
) {
let ttid_refs = self.0.entry(ttid.as_tenant_timeline_id()).or_default();
for (layer_name, layer_metadata) in layers {
// Increment refcount of this layer in the ancestor shard
*(ttid_refs
.entry((layer_metadata.shard, layer_name))
.or_default()) += 1;
}
}
/// For a particular TTID, return the map of all ancestor layers referenced by a descendent to their refcount
///
/// The `ShardIndex` in the result's key is the index of the _ancestor_, not the descendent.
pub(super) fn get_ttid_refcounts(
&self,
ttid: &TenantTimelineId,
) -> Option<&HashMap<(ShardIndex, LayerName), usize>> {
self.0.get(ttid)
}
}
}
use refs::AncestorRefs;
// As we see shards for a tenant, acccumulate knowledge needed for cross-shard GC:
// - Are there any ancestor shards?
// - Are there any refs to ancestor shards' layers?
#[derive(Default)]
struct TenantRefAccumulator {
shards_seen: HashMap<TenantId, Vec<ShardIndex>>,
// For each shard that has refs to an ancestor's layers, the set of ancestor layers referred to
ancestor_ref_shards: AncestorRefs,
}
impl TenantRefAccumulator {
fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
let this_shard_idx = ttid.tenant_shard_id.to_index();
(*self
.shards_seen
.entry(ttid.tenant_shard_id.tenant_id)
.or_default())
.push(this_shard_idx);
let mut ancestor_refs = Vec::new();
for (layer_name, layer_metadata) in &index_part.layer_metadata {
if layer_metadata.shard != this_shard_idx {
// This is a reference from this shard to a layer in an ancestor shard: we must track this
// as a marker to not GC this layer from the parent.
ancestor_refs.push((layer_name.clone(), layer_metadata.clone()));
}
}
if !ancestor_refs.is_empty() {
tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len());
self.ancestor_ref_shards.update(ttid, ancestor_refs);
}
}
/// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve
async fn into_gc_ancestors(
self,
controller_client: &control_api::Client,
summary: &mut GcSummary,
) -> (Vec<TenantShardId>, AncestorRefs) {
let mut ancestors_to_gc = Vec::new();
for (tenant_id, mut shard_indices) in self.shards_seen {
// Find the highest shard count
let latest_count = shard_indices
.iter()
.map(|i| i.shard_count)
.max()
.expect("Always at least one shard");
let (mut latest_shards, ancestor_shards) = {
let at =
itertools::partition(&mut shard_indices, |i| i.shard_count == latest_count);
(shard_indices[0..at].to_owned(), &shard_indices[at..])
};
// Sort shards, as we will later compare them with a sorted list from the controller
latest_shards.sort();
// Check that we have a complete view of the latest shard count: this should always be the case unless we happened
// to scan the S3 bucket halfway through a shard split.
if latest_shards.len() != latest_count.count() as usize {
// This should be extremely rare, so we warn on it.
tracing::warn!(%tenant_id, "Missed some shards at count {:?}", latest_count);
continue;
}
// Check if we have any non-latest-count shards
if ancestor_shards.is_empty() {
tracing::debug!(%tenant_id, "No ancestor shards to clean up");
continue;
}
// Based on S3 view, this tenant looks like it might have some ancestor shard work to do. We
// must only do this work if the tenant is not currently being split: otherwise, it is not safe
// to GC ancestors, because if the split fails then the controller will try to attach ancestor
// shards again.
match controller_client
.dispatch::<(), TenantDescribeResponse>(
Method::GET,
format!("control/v1/tenant/{tenant_id}"),
None,
)
.await
{
Err(e) => {
// We were not able to learn the latest shard split state from the controller, so we will not
// do ancestor GC on this tenant.
tracing::warn!(%tenant_id, "Failed to query storage controller, will not do ancestor GC: {e}");
summary.controller_api_errors += 1;
continue;
}
Ok(desc) => {
// We expect to see that the latest shard count matches the one we saw in S3, and that none
// of the shards indicate splitting in progress.
let controller_indices: Vec<ShardIndex> = desc
.shards
.iter()
.map(|s| s.tenant_shard_id.to_index())
.collect();
if controller_indices != latest_shards {
tracing::info!(%tenant_id, "Latest shards seen in S3 ({latest_shards:?}) don't match controller state ({controller_indices:?})");
continue;
}
if desc.shards.iter().any(|s| s.is_splitting) {
tracing::info!(%tenant_id, "One or more shards is currently splitting");
continue;
}
// This shouldn't be too noisy, because we only log this for tenants that have some ancestral refs.
tracing::info!(%tenant_id, "Validated state with controller: {desc:?}");
}
}
// GC ancestor shards
for ancestor_shard in ancestor_shards.iter().map(|idx| TenantShardId {
tenant_id,
shard_count: idx.shard_count,
shard_number: idx.shard_number,
}) {
ancestors_to_gc.push(ancestor_shard);
}
}
(ancestors_to_gc, self.ancestor_ref_shards)
}
}
async fn is_old_enough(
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
key: &str,
summary: &mut GcSummary,
) -> bool {
// Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
// it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
let age: Duration = match s3_client
.head_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(response) => match response.last_modified {
None => {
tracing::warn!("Missing last_modified");
summary.remote_storage_errors += 1;
return false;
}
Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) {
Ok(Ok(e)) => e,
Err(_) | Ok(Err(_)) => {
tracing::warn!("Bad last_modified time: {last_modified:?}");
return false;
}
},
},
Err(e) => {
tracing::warn!("Failed to HEAD {key}: {e}");
summary.remote_storage_errors += 1;
return false;
}
};
let old_enough = &age > min_age;
if !old_enough {
tracing::info!(
"Skipping young object {} < {}",
humantime::format_duration(age),
humantime::format_duration(*min_age)
);
}
old_enough
}
async fn maybe_delete_index(
s3_client: &Client,
bucket_config: &BucketConfig,
@@ -79,45 +329,7 @@ async fn maybe_delete_index(
return;
}
// Validation: we will only delete indices after one week, so that during incidents we will have
// easy access to recent indices.
let age: Duration = match s3_client
.head_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(response) => match response.last_modified {
None => {
tracing::warn!("Missing last_modified");
summary.remote_storage_errors += 1;
return;
}
Some(last_modified) => {
let last_modified =
UNIX_EPOCH + Duration::from_secs_f64(last_modified.as_secs_f64());
match last_modified.elapsed() {
Ok(e) => e,
Err(_) => {
tracing::warn!("Bad last_modified time: {last_modified:?}");
return;
}
}
}
},
Err(e) => {
tracing::warn!("Failed to HEAD {key}: {e}");
summary.remote_storage_errors += 1;
return;
}
};
if &age < min_age {
tracing::info!(
"Skipping young object {} < {}",
age.as_secs_f64(),
min_age.as_secs_f64()
);
if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await {
return;
}
@@ -145,6 +357,108 @@ async fn maybe_delete_index(
}
}
#[allow(clippy::too_many_arguments)]
async fn gc_ancestor(
s3_client: &Client,
bucket_config: &BucketConfig,
root_target: &RootTarget,
min_age: &Duration,
ancestor: TenantShardId,
refs: &AncestorRefs,
mode: GcMode,
summary: &mut GcSummary,
) -> anyhow::Result<()> {
// Scan timelines in the ancestor
let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?;
let mut timelines = std::pin::pin!(timelines);
// Build a list of keys to retain
while let Some(ttid) = timelines.next().await {
let ttid = ttid?;
let data = list_timeline_blobs(s3_client, ttid, root_target).await?;
let s3_layers = match data.blob_data {
BlobDataParseResult::Parsed {
index_part: _,
index_part_generation: _,
s3_layers,
} => s3_layers,
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.
continue;
}
BlobDataParseResult::Incorrect(reasons) => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!(
"Skipping ancestor GC for timeline {ttid}, bad metadata: {reasons:?}"
);
continue;
}
};
let ttid_refs = refs.get_ttid_refcounts(&ttid.as_tenant_timeline_id());
let ancestor_shard_index = ttid.tenant_shard_id.to_index();
for (layer_name, layer_gen) in s3_layers {
let ref_count = ttid_refs
.and_then(|m| m.get(&(ancestor_shard_index, layer_name.clone())))
.copied()
.unwrap_or(0);
if ref_count > 0 {
tracing::debug!(%ttid, "Ancestor layer {layer_name} has {ref_count} refs");
continue;
}
tracing::info!(%ttid, "Ancestor layer {layer_name} is not referenced");
// Build the key for the layer we are considering deleting
let key = root_target.absolute_key(&remote_layer_path(
&ttid.tenant_shard_id.tenant_id,
&ttid.timeline_id,
ancestor_shard_index,
&layer_name,
layer_gen,
));
// We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
// to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await {
continue;
}
if !matches!(mode, GcMode::Full) {
tracing::info!("Dry run: would delete key {key}");
continue;
}
// All validations passed: erase the object
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(&key)
.send()
.await
{
Ok(_) => {
tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
summary.ancestor_layers_deleted += 1;
}
Err(e) => {
tracing::warn!("Failed to delete layer {key}: {e}");
summary.remote_storage_errors += 1;
}
}
}
// TODO: if all the layers are gone, clean up the whole timeline dir (remove index)
}
Ok(())
}
/// Physical garbage collection: removing unused S3 objects. This is distinct from the garbage collection
/// done inside the pageserver, which operates at a higher level (keys, layers). This type of garbage collection
/// is about removing:
@@ -156,22 +470,26 @@ async fn maybe_delete_index(
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
pub async fn pageserver_physical_gc(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
controller_client_conf: Option<ControllerClientConfig>,
tenant_shard_ids: Vec<TenantShardId>,
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = if tenant_ids.is_empty() {
let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
};
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
// Accumulate information about each tenant for cross-shard GC step we'll do at the end
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
@@ -185,16 +503,17 @@ pub async fn pageserver_physical_gc(
target: &RootTarget,
mode: GcMode,
ttid: TenantShardTimelineId,
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(s3_client, ttid, target).await?;
let (latest_gen, candidates) = match &data.blob_data {
let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _index_part,
index_part,
index_part_generation,
s3_layers: _s3_layers,
} => (*index_part_generation, data.unused_index_keys),
} => (index_part, *index_part_generation, data.unused_index_keys),
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
@@ -206,6 +525,8 @@ pub async fn pageserver_physical_gc(
}
};
accumulator.lock().unwrap().update(ttid, index_part);
for key in candidates {
maybe_delete_index(
s3_client,
@@ -222,17 +543,61 @@ pub async fn pageserver_physical_gc(
Ok(summary)
}
let timelines = timelines
.map_ok(|ttid| gc_timeline(&s3_client, &bucket_config, &min_age, &target, mode, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
let mut summary = GcSummary::default();
while let Some(i) = timelines.next().await {
let tl_summary = i?;
// Drain futures for per-shard GC, populating accumulator as a side effect
{
let timelines = timelines.map_ok(|ttid| {
gc_timeline(
&s3_client,
&bucket_config,
&min_age,
&target,
mode,
ttid,
&accumulator,
)
});
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
summary.indices_deleted += tl_summary.indices_deleted;
summary.remote_storage_errors += tl_summary.remote_storage_errors;
while let Some(i) = timelines.next().await {
summary.merge(i?);
}
}
// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
let Some(controller_client) = controller_client_conf.as_ref().map(|c| {
let ControllerClientConfig {
controller_api,
controller_jwt,
} = c;
control_api::Client::new(controller_api.clone(), Some(controller_jwt.clone()))
}) else {
tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
return Ok(summary);
};
let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator)
.unwrap()
.into_inner()
.unwrap()
.into_gc_ancestors(&controller_client, &mut summary)
.await;
for ancestor_shard in ancestor_shards {
gc_ancestor(
&s3_client,
&bucket_config,
&target,
&min_age,
ancestor_shard,
&ancestor_refs,
mode,
&mut summary,
)
.instrument(info_span!("gc_ancestor", %ancestor_shard))
.await?;
}
Ok(summary)

View File

@@ -997,7 +997,7 @@ class NeonEnvBuilder:
if self.scrub_on_exit:
try:
StorageScrubber(self).scan_metadata()
self.env.storage_scrubber.scan_metadata()
except Exception as e:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
@@ -1225,6 +1225,9 @@ class NeonEnv:
)
cfg["safekeepers"].append(sk_cfg)
# Scrubber instance for tests that use it, and for use during teardown checks
self.storage_scrubber = StorageScrubber(self, log_dir=config.test_output_dir)
log.info(f"Config: {cfg}")
self.neon_cli.init(
cfg,
@@ -4265,9 +4268,9 @@ class Safekeeper(LogUtils):
class StorageScrubber:
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):
def __init__(self, env: NeonEnv, log_dir: Path):
self.env = env
self.log_dir = log_dir or env.test_output_dir
self.log_dir = log_dir
def scrubber_cli(self, args: list[str], timeout) -> str:
assert isinstance(self.env.pageserver_remote_storage, S3Storage)
@@ -4284,11 +4287,14 @@ class StorageScrubber:
if s3_storage.endpoint is not None:
env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint})
base_args = [str(self.env.neon_binpath / "storage_scrubber")]
base_args = [
str(self.env.neon_binpath / "storage_scrubber"),
f"--controller-api={self.env.storage_controller_api}",
]
args = base_args + args
(output_path, stdout, status_code) = subprocess_capture(
self.env.test_output_dir,
self.log_dir,
args,
echo_stderr=True,
echo_stdout=True,
@@ -4327,7 +4333,10 @@ class StorageScrubber:
log.info(f"tenant-snapshot output: {stdout}")
def pageserver_physical_gc(
self, min_age_secs: int, tenant_ids: Optional[list[TenantId]] = None
self,
min_age_secs: int,
tenant_ids: Optional[list[TenantId]] = None,
mode: Optional[str] = None,
):
args = ["pageserver-physical-gc", "--min-age", f"{min_age_secs}s"]
@@ -4337,6 +4346,9 @@ class StorageScrubber:
for tenant_id in tenant_ids:
args.extend(["--tenant-id", str(tenant_id)])
if mode is not None:
args.extend(["--mode", mode])
stdout = self.scrubber_cli(
args,
timeout=30,

View File

@@ -22,7 +22,6 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
StorageScrubber,
generate_uploads_and_deletions,
)
from fixtures.pageserver.common_types import parse_layer_file_name
@@ -215,7 +214,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
metadata_summary = StorageScrubber(neon_env_builder).scan_metadata()
metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1

View File

@@ -7,7 +7,7 @@ from typing import Any, Dict, Optional
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, StorageScrubber
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_prefix_empty,
@@ -234,7 +234,7 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
# Having done a bunch of attach/detach cycles, we will have generated some index garbage: check
# that the scrubber sees it and cleans it up. We do this before the final attach+validate pass,
# to also validate that the scrubber isn't breaking anything.
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] > 0
@@ -555,7 +555,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
StorageScrubber(neon_env_builder).scan_metadata()
env.storage_scrubber.scan_metadata()
# Detach secondary and delete tenant
# ===================================

View File

@@ -12,7 +12,6 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
StorageControllerApiException,
StorageScrubber,
last_flush_lsn_upload,
tenant_get_shards,
wait_for_last_flush_lsn,
@@ -128,7 +127,7 @@ def test_sharding_smoke(
# Check the scrubber isn't confused by sharded content, then disable
# it during teardown because we'll have deleted by then
StorageScrubber(neon_env_builder).scan_metadata()
env.storage_scrubber.scan_metadata()
neon_env_builder.scrub_on_exit = False
env.storage_controller.pageserver_api().tenant_delete(tenant_id)

View File

@@ -1,14 +1,19 @@
import os
import shutil
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
StorageScrubber,
)
from fixtures.remote_storage import S3Storage, s3_storage
from fixtures.utils import wait_until
from fixtures.workload import Workload
@@ -60,8 +65,7 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
output_path = neon_env_builder.test_output_dir / "snapshot"
os.makedirs(output_path)
scrubber = StorageScrubber(neon_env_builder)
scrubber.tenant_snapshot(tenant_id, output_path)
env.storage_scrubber.tenant_snapshot(tenant_id, output_path)
assert len(os.listdir(output_path)) > 0
@@ -111,6 +115,14 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
workload.validate()
def drop_local_state(env: NeonEnv, tenant_id: TenantId):
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.reconcile_until_idle()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
env.storage_controller.reconcile_until_idle()
@pytest.mark.parametrize("shard_count", [None, 4])
def test_scrubber_physical_gc(neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
@@ -133,28 +145,231 @@ def test_scrubber_physical_gc(neon_env_builder: NeonEnvBuilder, shard_count: Opt
# For each cycle, detach and attach the tenant to bump the generation, and do some writes to generate uploads
for _i in range(0, n_cycles):
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
env.storage_controller.reconcile_until_idle()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}})
env.storage_controller.reconcile_until_idle()
drop_local_state(env, tenant_id)
# This write includes remote upload, will generate an index in this generation
workload.write_rows(1)
# With a high min_age, the scrubber should decline to delete anything
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=3600)
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=3600)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
# If targeting a different tenant, the scrubber shouldn't do anything
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(
gc_summary = env.storage_scrubber.pageserver_physical_gc(
min_age_secs=1, tenant_ids=[TenantId.generate()]
)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
# With a low min_age, the scrubber should go ahead and clean up all but the latest 2 generations
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1)
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == (expect_indices_per_shard - 2) * shard_count
@pytest.mark.parametrize("shard_count", [None, 2])
def test_scrubber_physical_gc_ancestors(
neon_env_builder: NeonEnvBuilder, shard_count: Optional[int]
):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id,
timeline_id,
shard_count=shard_count,
conf={
# Small layers and low compaction thresholds, so that when we split we can expect some to
# be dropped by child shards
"checkpoint_distance": f"{1024 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{1024 * 1024}",
"image_creation_threshold": "2",
"image_layer_creation_check_threshold": "0",
# Disable background compaction, we will do it explicitly
"compaction_period": "0s",
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
},
)
# Make sure the original shard has some layers
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(100)
new_shard_count = 4
assert shard_count is None or new_shard_count > shard_count
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
# Make sure child shards have some layers
workload.write_rows(100)
# Flush deletion queue so that we don't leave any orphan layers in the parent that will confuse subsequent checks: once
# a shard is split, any layers in its prefix that aren't referenced by a child will be considered GC'able, even
# if they were logically deleted before the shard split, just not physically deleted yet because of the queue.
for ps in env.pageservers:
ps.http_client().deletion_queue_flush(execute=True)
# Before compacting, all the layers in the ancestor should still be referenced by the children: the scrubber
# should not erase any ancestor layers
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["ancestor_layers_deleted"] == 0
# Write some data and compact: compacting, some ancestor layers should no longer be needed by children
# (the compaction is part of the checkpoint that Workload does for us)
workload.churn_rows(100)
workload.churn_rows(100)
workload.churn_rows(100)
for shard in shards:
ps = env.get_tenant_pageserver(shard)
ps.http_client().timeline_compact(shard, timeline_id)
ps.http_client().timeline_gc(shard, timeline_id, 0)
# We will use a min_age_secs=1 threshold for deletion, let it pass
time.sleep(2)
# Our time threshold should be respected: check that with a high threshold we delete nothing
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=3600, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["ancestor_layers_deleted"] == 0
# Now run with a low time threshold: deletions of ancestor layers should be executed
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["ancestor_layers_deleted"] > 0
# We deleted some layers: now check we didn't corrupt the tenant by doing so. Detach and
# attach it, to drop any local state, then check it's still readable.
workload.stop()
drop_local_state(env, tenant_id)
workload.validate()
def test_scrubber_physical_gc_ancestors_split(neon_env_builder: NeonEnvBuilder):
"""
Exercise ancestor GC while a tenant is partly split: this test ensures that if we have some child shards
which don't reference an ancestor, but some child shards that don't exist yet, then we do not incorrectly
GC any ancestor layers.
"""
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
initial_shard_count = 2
env.neon_cli.create_tenant(
tenant_id,
timeline_id,
shard_count=initial_shard_count,
conf={
# Small layers and low compaction thresholds, so that when we split we can expect some to
# be dropped by child shards
"checkpoint_distance": f"{1024 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{1024 * 1024}",
"image_creation_threshold": "2",
"image_layer_creation_check_threshold": "0",
# Disable background compaction, we will do it explicitly
"compaction_period": "0s",
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
},
)
unstuck = threading.Event()
def stuck_split():
# Pause our shard split after the first shard but before the second, such that when we run
# the scrub, the S3 bucket contains shards 0002, 0101, 0004, 0204 (but not 0104, 0304).
env.storage_controller.configure_failpoints(
("shard-split-post-remote-sleep", "return(3600000)")
)
try:
split_response = env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
except Exception as e:
log.info(f"Split failed with {e}")
else:
if not unstuck.is_set():
raise RuntimeError(f"Split succeeded unexpectedly ({split_response})")
with ThreadPoolExecutor(max_workers=1) as threads:
log.info("Starting hung shard split")
stuck_split_fut = threads.submit(stuck_split)
# Let the controller reach the failpoint
wait_until(
10,
1,
lambda: env.storage_controller.assert_log_contains(
'failpoint "shard-split-post-remote-sleep": sleeping'
),
)
# Run compaction on the new child shards, so that they drop some refs to their parent
child_shards = [
TenantShardId(tenant_id, 0, 4),
TenantShardId(tenant_id, 2, 4),
]
log.info("Compacting first two children")
for child in child_shards:
env.get_tenant_pageserver(
TenantShardId(tenant_id, 0, initial_shard_count)
).http_client().timeline_compact(child, timeline_id)
# Check that the other child shards weren't created
assert env.get_tenant_pageserver(TenantShardId(tenant_id, 1, 4)) is None
assert env.get_tenant_pageserver(TenantShardId(tenant_id, 3, 4)) is None
# Run scrubber: it should not incorrectly interpret the **04 shards' lack of refs to all
# ancestor layers as a reason to GC them, because it should realize that a split is in progress.
# (GC requires that controller does not indicate split in progress, and that if we see the highest
# shard count N, then there are N shards present with that shard count).
gc_output = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full")
log.info(f"Ran physical GC partway through split: {gc_output}")
assert gc_output["ancestor_layers_deleted"] == 0
assert gc_output["remote_storage_errors"] == 0
assert gc_output["controller_api_errors"] == 0
# Storage controller shutdown lets our split request client complete
log.info("Stopping storage controller")
unstuck.set()
env.storage_controller.allowed_errors.append(".*Timed out joining HTTP server task.*")
env.storage_controller.stop()
stuck_split_fut.result()
# Restart the controller and retry the split with the failpoint disabled, this should
# complete successfully and result in an S3 state that allows the scrubber to proceed with removing ancestor layers
log.info("Starting & retrying split")
env.storage_controller.start()
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
# The other child shards exist now, we can compact them to drop refs to ancestor
log.info("Compacting second two children")
for child in [
TenantShardId(tenant_id, 1, 4),
TenantShardId(tenant_id, 3, 4),
]:
env.get_tenant_pageserver(child).http_client().timeline_compact(child, timeline_id)
gc_output = env.storage_scrubber.pageserver_physical_gc(min_age_secs=0, mode="full")
log.info(f"Ran physical GC after split completed: {gc_output}")
assert gc_output["ancestor_layers_deleted"] > 0
assert gc_output["remote_storage_errors"] == 0
assert gc_output["controller_api_errors"] == 0

View File

@@ -5,7 +5,6 @@ from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
StorageScrubber,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException
@@ -325,7 +324,6 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
remote_storage_kind = RemoteStorageKind.MOCK_S3
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
scrubber = StorageScrubber(neon_env_builder)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
ps_http = env.pageserver.http_client()
@@ -340,7 +338,7 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
env.stop()
result = scrubber.scan_metadata()
result = env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []
env.start()
@@ -348,5 +346,5 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
ps_http.tenant_delete(tenant_id)
env.stop()
scrubber.scan_metadata()
env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []