mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 11:32:56 +00:00
s3_scrubber: basic support for sharding (#6119)
This doesn't make the scrubber smart enough to understand that many shards are part of the same tenants, but it makes it understand paths well enough to scrub the individual shards without thinking they're malformed. This is a prerequisite to being able to run tests with sharding enabled. Related: #5929
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4292,6 +4292,7 @@ dependencies = [
|
||||
"histogram",
|
||||
"itertools",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"rand 0.8.5",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
|
||||
@@ -31,6 +31,7 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls"
|
||||
aws-config = { workspace = true, default-features = false, features = ["rustls", "sso"] }
|
||||
|
||||
pageserver = { path = "../pageserver" }
|
||||
pageserver_api = { path = "../libs/pageserver_api" }
|
||||
remote_storage = { path = "../libs/remote_storage" }
|
||||
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -7,13 +7,12 @@ use utils::generation::Generation;
|
||||
|
||||
use crate::cloud_admin_api::BranchData;
|
||||
use crate::metadata_stream::stream_listing;
|
||||
use crate::{download_object_with_retries, RootTarget};
|
||||
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use remote_storage::RemotePath;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
pub(crate) struct TimelineAnalysis {
|
||||
/// Anomalies detected
|
||||
@@ -39,8 +38,8 @@ impl TimelineAnalysis {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
id: &TenantTimelineId,
|
||||
pub(crate) fn branch_cleanup_and_check_errors(
|
||||
id: &TenantShardTimelineId,
|
||||
s3_root: &RootTarget,
|
||||
s3_active_branch: Option<&BranchData>,
|
||||
console_branch: Option<BranchData>,
|
||||
@@ -238,7 +237,7 @@ fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), St
|
||||
|
||||
pub(crate) async fn list_timeline_blobs(
|
||||
s3_client: &Client,
|
||||
id: TenantTimelineId,
|
||||
id: TenantShardTimelineId,
|
||||
s3_root: &RootTarget,
|
||||
) -> anyhow::Result<S3TimelineBlobData> {
|
||||
let mut s3_layers = HashSet::new();
|
||||
|
||||
@@ -10,15 +10,16 @@ use aws_sdk_s3::{
|
||||
Client,
|
||||
};
|
||||
use futures_util::{pin_mut, TryStreamExt};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_stream::StreamExt;
|
||||
use utils::id::{TenantId, TenantTimelineId};
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::{
|
||||
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
|
||||
init_remote,
|
||||
metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
|
||||
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TraversingDepth,
|
||||
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -29,8 +30,8 @@ enum GarbageReason {
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
enum GarbageEntity {
|
||||
Tenant(TenantId),
|
||||
Timeline(TenantTimelineId),
|
||||
Tenant(TenantShardId),
|
||||
Timeline(TenantShardTimelineId),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -142,6 +143,9 @@ async fn find_garbage_inner(
|
||||
console_projects.len()
|
||||
);
|
||||
|
||||
// TODO(sharding): batch calls into Console so that we only call once for each TenantId,
|
||||
// rather than checking the same TenantId for multiple TenantShardId
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
|
||||
let tenants = stream_tenants(&s3_client, &target);
|
||||
@@ -149,10 +153,10 @@ async fn find_garbage_inner(
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
let console_projects = &console_projects;
|
||||
async move {
|
||||
match console_projects.get(&t) {
|
||||
match console_projects.get(&t.tenant_id) {
|
||||
Some(project_data) => Ok((t, Some(project_data.clone()))),
|
||||
None => api_client
|
||||
.find_tenant_project(t)
|
||||
.find_tenant_project(t.tenant_id)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
.map(|r| (t, r)),
|
||||
@@ -166,21 +170,21 @@ async fn find_garbage_inner(
|
||||
// checks if they are enabled by the `depth` parameter.
|
||||
pin_mut!(tenants_checked);
|
||||
let mut garbage = GarbageList::new(node_kind, bucket_config);
|
||||
let mut active_tenants: Vec<TenantId> = vec![];
|
||||
let mut active_tenants: Vec<TenantShardId> = vec![];
|
||||
let mut counter = 0;
|
||||
while let Some(result) = tenants_checked.next().await {
|
||||
let (tenant_id, console_result) = result?;
|
||||
let (tenant_shard_id, console_result) = result?;
|
||||
|
||||
// Paranoia check
|
||||
if let Some(project) = &console_result {
|
||||
assert!(project.tenant == tenant_id);
|
||||
assert!(project.tenant == tenant_shard_id.tenant_id);
|
||||
}
|
||||
|
||||
if garbage.maybe_append(GarbageEntity::Tenant(tenant_id), console_result) {
|
||||
tracing::debug!("Tenant {tenant_id} is garbage");
|
||||
if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
|
||||
tracing::debug!("Tenant {tenant_shard_id} is garbage");
|
||||
} else {
|
||||
tracing::debug!("Tenant {tenant_id} is active");
|
||||
active_tenants.push(tenant_id);
|
||||
tracing::debug!("Tenant {tenant_shard_id} is active");
|
||||
active_tenants.push(tenant_shard_id);
|
||||
}
|
||||
|
||||
counter += 1;
|
||||
@@ -266,13 +270,13 @@ impl std::fmt::Display for PurgeMode {
|
||||
pub async fn get_tenant_objects(
|
||||
s3_client: &Arc<Client>,
|
||||
target: RootTarget,
|
||||
tenant_id: TenantId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> anyhow::Result<Vec<ObjectIdentifier>> {
|
||||
tracing::debug!("Listing objects in tenant {tenant_id}");
|
||||
tracing::debug!("Listing objects in tenant {tenant_shard_id}");
|
||||
// TODO: apply extra validation based on object modification time. Don't purge
|
||||
// tenants where any timeline's index_part.json has been touched recently.
|
||||
|
||||
let mut tenant_root = target.tenant_root(&tenant_id);
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
|
||||
// Remove delimiter, so that object listing lists all keys in the prefix and not just
|
||||
// common prefixes.
|
||||
@@ -285,7 +289,7 @@ pub async fn get_tenant_objects(
|
||||
pub async fn get_timeline_objects(
|
||||
s3_client: &Arc<Client>,
|
||||
target: RootTarget,
|
||||
ttid: TenantTimelineId,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<Vec<ObjectIdentifier>> {
|
||||
tracing::debug!("Listing objects in timeline {ttid}");
|
||||
let mut timeline_root = target.timeline_root(&ttid);
|
||||
|
||||
@@ -22,6 +22,7 @@ use aws_sdk_s3::{Client, Config};
|
||||
|
||||
use clap::ValueEnum;
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::io::IsTerminal;
|
||||
@@ -29,7 +30,7 @@ use tokio::io::AsyncReadExt;
|
||||
use tracing::error;
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
use utils::id::{TenantId, TenantTimelineId};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
const MAX_RETRIES: usize = 20;
|
||||
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
|
||||
@@ -44,6 +45,35 @@ pub struct S3Target {
|
||||
pub delimiter: String,
|
||||
}
|
||||
|
||||
/// Convenience for referring to timelines within a particular shard: more ergonomic
|
||||
/// than using a 2-tuple.
|
||||
///
|
||||
/// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
|
||||
/// than somewhere more broadly exposed, because this kind of thing is rarely needed
|
||||
/// in the pageserver, as all timeline objects existing in the scope of a particular
|
||||
/// tenant: the scrubber is different in that it handles collections of data referring to many
|
||||
/// TenantShardTimelineIds in on place.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl TenantShardTimelineId {
|
||||
fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for TenantShardTimelineId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum TraversingDepth {
|
||||
Tenant,
|
||||
@@ -110,19 +140,19 @@ impl RootTarget {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
|
||||
pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
|
||||
self.tenants_root().with_sub_segment(&tenant_id.to_string())
|
||||
}
|
||||
|
||||
pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
|
||||
pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
|
||||
match self {
|
||||
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
|
||||
Self::Safekeeper(_) => self.tenant_root(tenant_id),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
|
||||
self.timelines_root(&id.tenant_id)
|
||||
pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
|
||||
self.timelines_root(&id.tenant_shard_id)
|
||||
.with_sub_segment(&id.timeline_id.to_string())
|
||||
}
|
||||
|
||||
|
||||
@@ -3,14 +3,15 @@ use async_stream::{stream, try_stream};
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use tokio_stream::Stream;
|
||||
|
||||
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId};
|
||||
use utils::id::{TenantTimelineId, TimelineId};
|
||||
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
||||
pub fn stream_tenants<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
try_stream! {
|
||||
let mut continuation_token = None;
|
||||
let tenants_target = target.tenants_root();
|
||||
@@ -44,14 +45,14 @@ pub fn stream_tenants<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a TenantId, output a stream of the timelines within that tenant, discovered
|
||||
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
|
||||
/// using ListObjectsv2. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
pub async fn stream_tenant_timelines<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
tenant: TenantId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantTimelineId, anyhow::Error>> + 'a> {
|
||||
tenant: TenantShardId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
|
||||
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
|
||||
let mut continuation_token = None;
|
||||
let timelines_target = target.timelines_root(&tenant);
|
||||
@@ -98,7 +99,7 @@ pub async fn stream_tenant_timelines<'a>(
|
||||
Ok(stream! {
|
||||
for i in timeline_ids {
|
||||
let id = i?;
|
||||
yield Ok(TenantTimelineId::new(tenant, id));
|
||||
yield Ok(TenantShardTimelineId::new(tenant, id));
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,20 +5,19 @@ use crate::checks::{
|
||||
TimelineAnalysis,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{pin_mut, StreamExt, TryStreamExt};
|
||||
use histogram::Histogram;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use serde::Serialize;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MetadataSummary {
|
||||
count: usize,
|
||||
with_errors: HashSet<TenantTimelineId>,
|
||||
with_warnings: HashSet<TenantTimelineId>,
|
||||
with_garbage: HashSet<TenantTimelineId>,
|
||||
with_errors: HashSet<TenantShardTimelineId>,
|
||||
with_warnings: HashSet<TenantShardTimelineId>,
|
||||
with_garbage: HashSet<TenantShardTimelineId>,
|
||||
indices_by_version: HashMap<usize, usize>,
|
||||
|
||||
layer_count: MinMaxHisto,
|
||||
@@ -132,7 +131,7 @@ impl MetadataSummary {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) {
|
||||
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
|
||||
if !analysis.errors.is_empty() {
|
||||
self.with_errors.insert(*id);
|
||||
}
|
||||
@@ -199,8 +198,8 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
async fn report_on_timeline(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
ttid: TenantTimelineId,
|
||||
) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> {
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
@@ -213,8 +212,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<Metada
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
|
||||
let analysis =
|
||||
branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await;
|
||||
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
|
||||
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user