diff --git a/Cargo.lock b/Cargo.lock index 7bf9209395..08e6961408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4292,6 +4292,7 @@ dependencies = [ "histogram", "itertools", "pageserver", + "pageserver_api", "rand 0.8.5", "remote_storage", "reqwest", diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index e26f2c6d6b..fdae378d55 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -31,6 +31,7 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls" aws-config = { workspace = true, default-features = false, features = ["rustls", "sso"] } pageserver = { path = "../pageserver" } +pageserver_api = { path = "../libs/pageserver_api" } remote_storage = { path = "../libs/remote_storage" } tracing.workspace = true diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index a15a908212..2acbb2352b 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -7,13 +7,12 @@ use utils::generation::Generation; use crate::cloud_admin_api::BranchData; use crate::metadata_stream::stream_listing; -use crate::{download_object_with_retries, RootTarget}; +use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId}; use futures_util::{pin_mut, StreamExt}; use pageserver::tenant::remote_timeline_client::parse_remote_index_path; use pageserver::tenant::storage_layer::LayerFileName; use pageserver::tenant::IndexPart; use remote_storage::RemotePath; -use utils::id::TenantTimelineId; pub(crate) struct TimelineAnalysis { /// Anomalies detected @@ -39,8 +38,8 @@ impl TimelineAnalysis { } } -pub(crate) async fn branch_cleanup_and_check_errors( - id: &TenantTimelineId, +pub(crate) fn branch_cleanup_and_check_errors( + id: &TenantShardTimelineId, s3_root: &RootTarget, s3_active_branch: Option<&BranchData>, console_branch: Option, @@ -238,7 +237,7 @@ fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), St pub(crate) async fn list_timeline_blobs( s3_client: &Client, - id: TenantTimelineId, + id: TenantShardTimelineId, s3_root: &RootTarget, ) -> anyhow::Result { let mut s3_layers = HashSet::new(); diff --git a/s3_scrubber/src/garbage.rs b/s3_scrubber/src/garbage.rs index f27e1d7f65..7192afb91b 100644 --- a/s3_scrubber/src/garbage.rs +++ b/s3_scrubber/src/garbage.rs @@ -10,15 +10,16 @@ use aws_sdk_s3::{ Client, }; use futures_util::{pin_mut, TryStreamExt}; +use pageserver_api::shard::TenantShardId; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; -use utils::id::{TenantId, TenantTimelineId}; +use utils::id::TenantId; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, init_remote, metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants}, - BucketConfig, ConsoleConfig, NodeKind, RootTarget, TraversingDepth, + BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth, }; #[derive(Serialize, Deserialize, Debug)] @@ -29,8 +30,8 @@ enum GarbageReason { #[derive(Serialize, Deserialize, Debug)] enum GarbageEntity { - Tenant(TenantId), - Timeline(TenantTimelineId), + Tenant(TenantShardId), + Timeline(TenantShardTimelineId), } #[derive(Serialize, Deserialize, Debug)] @@ -142,6 +143,9 @@ async fn find_garbage_inner( console_projects.len() ); + // TODO(sharding): batch calls into Console so that we only call once for each TenantId, + // rather than checking the same TenantId for multiple TenantShardId + // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket); let tenants = stream_tenants(&s3_client, &target); @@ -149,10 +153,10 @@ async fn find_garbage_inner( let api_client = cloud_admin_api_client.clone(); let console_projects = &console_projects; async move { - match console_projects.get(&t) { + match console_projects.get(&t.tenant_id) { Some(project_data) => Ok((t, Some(project_data.clone()))), None => api_client - .find_tenant_project(t) + .find_tenant_project(t.tenant_id) .await .map_err(|e| anyhow::anyhow!(e)) .map(|r| (t, r)), @@ -166,21 +170,21 @@ async fn find_garbage_inner( // checks if they are enabled by the `depth` parameter. pin_mut!(tenants_checked); let mut garbage = GarbageList::new(node_kind, bucket_config); - let mut active_tenants: Vec = vec![]; + let mut active_tenants: Vec = vec![]; let mut counter = 0; while let Some(result) = tenants_checked.next().await { - let (tenant_id, console_result) = result?; + let (tenant_shard_id, console_result) = result?; // Paranoia check if let Some(project) = &console_result { - assert!(project.tenant == tenant_id); + assert!(project.tenant == tenant_shard_id.tenant_id); } - if garbage.maybe_append(GarbageEntity::Tenant(tenant_id), console_result) { - tracing::debug!("Tenant {tenant_id} is garbage"); + if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) { + tracing::debug!("Tenant {tenant_shard_id} is garbage"); } else { - tracing::debug!("Tenant {tenant_id} is active"); - active_tenants.push(tenant_id); + tracing::debug!("Tenant {tenant_shard_id} is active"); + active_tenants.push(tenant_shard_id); } counter += 1; @@ -266,13 +270,13 @@ impl std::fmt::Display for PurgeMode { pub async fn get_tenant_objects( s3_client: &Arc, target: RootTarget, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, ) -> anyhow::Result> { - tracing::debug!("Listing objects in tenant {tenant_id}"); + tracing::debug!("Listing objects in tenant {tenant_shard_id}"); // TODO: apply extra validation based on object modification time. Don't purge // tenants where any timeline's index_part.json has been touched recently. - let mut tenant_root = target.tenant_root(&tenant_id); + let mut tenant_root = target.tenant_root(&tenant_shard_id); // Remove delimiter, so that object listing lists all keys in the prefix and not just // common prefixes. @@ -285,7 +289,7 @@ pub async fn get_tenant_objects( pub async fn get_timeline_objects( s3_client: &Arc, target: RootTarget, - ttid: TenantTimelineId, + ttid: TenantShardTimelineId, ) -> anyhow::Result> { tracing::debug!("Listing objects in timeline {ttid}"); let mut timeline_root = target.timeline_root(&ttid); diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index 6607db21e6..d2338c21e5 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -22,6 +22,7 @@ use aws_sdk_s3::{Client, Config}; use clap::ValueEnum; use pageserver::tenant::TENANTS_SEGMENT_NAME; +use pageserver_api::shard::TenantShardId; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::io::IsTerminal; @@ -29,7 +30,7 @@ use tokio::io::AsyncReadExt; use tracing::error; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use utils::id::{TenantId, TenantTimelineId}; +use utils::id::TimelineId; const MAX_RETRIES: usize = 20; const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN"; @@ -44,6 +45,35 @@ pub struct S3Target { pub delimiter: String, } +/// Convenience for referring to timelines within a particular shard: more ergonomic +/// than using a 2-tuple. +/// +/// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather +/// than somewhere more broadly exposed, because this kind of thing is rarely needed +/// in the pageserver, as all timeline objects existing in the scope of a particular +/// tenant: the scrubber is different in that it handles collections of data referring to many +/// TenantShardTimelineIds in on place. +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct TenantShardTimelineId { + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, +} + +impl TenantShardTimelineId { + fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self { + Self { + tenant_shard_id, + timeline_id, + } + } +} + +impl Display for TenantShardTimelineId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id) + } +} + #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)] pub enum TraversingDepth { Tenant, @@ -110,19 +140,19 @@ impl RootTarget { } } - pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target { + pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target { self.tenants_root().with_sub_segment(&tenant_id.to_string()) } - pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target { + pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target { match self { Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"), Self::Safekeeper(_) => self.tenant_root(tenant_id), } } - pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target { - self.timelines_root(&id.tenant_id) + pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target { + self.timelines_root(&id.tenant_shard_id) .with_sub_segment(&id.timeline_id.to_string()) } diff --git a/s3_scrubber/src/metadata_stream.rs b/s3_scrubber/src/metadata_stream.rs index 4cfa77cfc1..073f37f319 100644 --- a/s3_scrubber/src/metadata_stream.rs +++ b/s3_scrubber/src/metadata_stream.rs @@ -3,14 +3,15 @@ use async_stream::{stream, try_stream}; use aws_sdk_s3::{types::ObjectIdentifier, Client}; use tokio_stream::Stream; -use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId}; -use utils::id::{TenantTimelineId, TimelineId}; +use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId}; +use pageserver_api::shard::TenantShardId; +use utils::id::TimelineId; /// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2 pub fn stream_tenants<'a>( s3_client: &'a Client, target: &'a RootTarget, -) -> impl Stream> + 'a { +) -> impl Stream> + 'a { try_stream! { let mut continuation_token = None; let tenants_target = target.tenants_root(); @@ -44,14 +45,14 @@ pub fn stream_tenants<'a>( } } -/// Given a TenantId, output a stream of the timelines within that tenant, discovered +/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered /// using ListObjectsv2. The listing is done before the stream is built, so that this /// function can be used to generate concurrency on a stream using buffer_unordered. pub async fn stream_tenant_timelines<'a>( s3_client: &'a Client, target: &'a RootTarget, - tenant: TenantId, -) -> anyhow::Result> + 'a> { + tenant: TenantShardId, +) -> anyhow::Result> + 'a> { let mut timeline_ids: Vec> = Vec::new(); let mut continuation_token = None; let timelines_target = target.timelines_root(&tenant); @@ -98,7 +99,7 @@ pub async fn stream_tenant_timelines<'a>( Ok(stream! { for i in timeline_ids { let id = i?; - yield Ok(TenantTimelineId::new(tenant, id)); + yield Ok(TenantShardTimelineId::new(tenant, id)); } }) } diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index 228f8d6763..91347ca21b 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -5,20 +5,19 @@ use crate::checks::{ TimelineAnalysis, }; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; -use crate::{init_remote, BucketConfig, NodeKind, RootTarget}; +use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId}; use aws_sdk_s3::Client; use futures_util::{pin_mut, StreamExt, TryStreamExt}; use histogram::Histogram; use pageserver::tenant::IndexPart; use serde::Serialize; -use utils::id::TenantTimelineId; #[derive(Serialize)] pub struct MetadataSummary { count: usize, - with_errors: HashSet, - with_warnings: HashSet, - with_garbage: HashSet, + with_errors: HashSet, + with_warnings: HashSet, + with_garbage: HashSet, indices_by_version: HashMap, layer_count: MinMaxHisto, @@ -132,7 +131,7 @@ impl MetadataSummary { } } - fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) { + fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) { if !analysis.errors.is_empty() { self.with_errors.insert(*id); } @@ -199,8 +198,8 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> { + ttid: TenantShardTimelineId, + ) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> { let data = list_timeline_blobs(s3_client, ttid, target).await?; Ok((ttid, data)) } @@ -213,8 +212,7 @@ pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result