scrubber: add separate find/purge garbage commands (#5409)

## Problem

The previous garbage cleanup functionality relied on doing a dry run,
inspecting logs, and then doing a deletion. This isn't ideal, because
what one actually deletes might not be the same as what one saw in the
dry run. It's also risky UX to rely on presence/absence of one CLI flag
to control deletion: ideally the deletion command should be totally
separate from the one that scans the bucket.

Related: https://github.com/neondatabase/neon/issues/5037

## Summary of changes

This is a major re-work of the code, which results in a net decrease in
line count of about 600. The old code for removing garbage was build
around the idea of doing discovery and purging together: a
"delete_batch_producer" sent batches into a deleter. The new code writes
out both procedures separately, in functions that use the async streams
introduced in https://github.com/neondatabase/neon/pull/5176 to achieve
fast concurrent access to S3 while retaining the readability of a single
function.

- Add `find-garbage`, which writes out a JSON file of tenants/timelines
to purge
- Add `purge-garbage` which consumes the garbage JSON file, applies some
extra validations, and does deletions.
- The purge command will refuse to execute if the garbage file indicates
that only garbage was found: this guards against classes of bugs where
the scrubber might incorrectly deem everything garbage.
- The purge command defaults to only deleting tenants that were found in
"deleted" state in the control plane. This guards against the risk that
using the wrong console API endpoint could cause all tenants to appear
to be missing.

Outstanding work for a future PR:
- Make whatever changes are needed to adapt to the Console/Control Plane
separation.
- Make purge even safer by checking S3 `Modified` times for
index_part.json files (not doing this here, because it will depend on
the generation-aware changes for finding index_part.json files)

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
Co-authored-by: Shany Pozin <shany@neon.tech>
This commit is contained in:
John Spray
2023-10-26 20:36:28 +01:00
committed by GitHub
parent 39b148b74e
commit 7c16b5215e
12 changed files with 727 additions and 1433 deletions

View File

@@ -25,57 +25,64 @@ _This section is only relevant if using a command that requires access to Neon's
### Commands
#### `tidy`
#### `find-garbage`
Iterate over S3 buckets for storage nodes, checking their contents and removing the data not present in the console. Node S3 data that's not removed is then further checked for discrepancies and, sometimes, validated.
Unless the global `--delete` argument is provided, this command only dry-runs and logs
what it would have deleted.
```
tidy --node-kind=<safekeeper|pageserver> [--depth=<tenant|timeline>] [--skip-validation]
```
Walk an S3 bucket and cross-reference the contents with the Console API to identify data for
tenants or timelines that should no longer exist.
- `--node-kind`: whether to inspect safekeeper or pageserver bucket prefix
- `--depth`: whether to only search for deletable tenants, or also search for
deletable timelines within active tenants. Default: `tenant`
- `--skip-validation`: skip additional post-deletion checks. Default: `false`
- `--output-path`: filename to write garbage list to. Default `garbage.json`
For a selected S3 path, the tool lists the S3 bucket given for either tenants or both tenants and timelines — for every found entry, console API is queried: any deleted or missing in the API entity is scheduled for deletion from S3.
This command outputs a JSON file describing tenants and timelines to remove, for subsequent
processing by the `purge-garbage` subcommand.
If validation is enabled, only the non-deleted tenants' ones are checked.
For pageserver, timelines' index_part.json on S3 is also checked for various discrepancies: no files are removed, even if there are "extra" S3 files not present in index_part.json: due to the way pageserver updates the remote storage, it's better to do such removals manually, stopping the corresponding tenant first.
**Note that the garbage list format is not stable. The output of `find-garbage` is only
intended for use by the exact same version of the tool running `purge-garbage`**
Command examples:
Example:
`env SSO_ACCOUNT_ID=369495373322 REGION=eu-west-1 BUCKET=neon-dev-storage-eu-west-1 CLOUD_ADMIN_API_TOKEN=${NEON_CLOUD_ADMIN_API_STAGING_KEY} CLOUD_ADMIN_API_URL=[url] cargo run --release -- tidy --node-kind=safekeeper`
`env SSO_ACCOUNT_ID=123456 REGION=eu-west-1 BUCKET=my-dev-bucket CLOUD_ADMIN_API_TOKEN=${NEON_CLOUD_ADMIN_API_STAGING_KEY} CLOUD_ADMIN_API_URL=[url] cargo run --release -- find-garbage --node-kind=pageserver --depth=tenant --output-path=eu-west-1-garbage.json`
`env SSO_ACCOUNT_ID=369495373322 REGION=us-east-2 BUCKET=neon-staging-storage-us-east-2 CLOUD_ADMIN_API_TOKEN=${NEON_CLOUD_ADMIN_API_STAGING_KEY} CLOUD_ADMIN_API_URL=[url] cargo run --release -- tidy --node-kind=pageserver --depth=timeline`
#### `purge-garbage`
When dry run stats look satisfying, use `-- --delete` before the `tidy` command to
disable dry run and run the binary with deletion enabled.
Consume a garbage list from `find-garbage`, and delete the related objects in the S3 bucket.
See these lines (and lines around) in the logs for the final stats:
- `--input-path`: filename to read garbage list from. Default `garbage.json`.
- `--mode`: controls whether to purge only garbage that was specifically marked
deleted in the control plane (`deletedonly`), or also to purge tenants/timelines
that were not present in the control plane at all (`deletedandmissing`)
- `Finished listing the bucket for tenants`
- `Finished active tenant and timeline validation`
- `Total tenant deletion stats`
- `Total timeline deletion stats`
This command learns region/bucket details from the garbage file, so it is not necessary
to pass them on the command line
## Current implementation details
Example:
- The tool does not have any peristent state currently: instead, it creates very verbose logs, with every S3 delete request logged, every tenant/timeline id check, etc.
Worse, any panic or early errored tasks might force the tool to exit without printing the final summary — all affected ids will still be in the logs though. The tool has retries inside it, so it's error-resistant up to some extent, and recent runs showed no traces of errors/panics.
`env SSO_ACCOUNT_ID=123456 cargo run --release -- purge-garbage --node-kind=pageserver --depth=tenant --input-path=eu-west-1-garbage.json`
- Instead of checking non-deleted tenants' timelines instantly, the tool attempts to create separate tasks (futures) for that,
complicating the logic and slowing down the process, this should be fixed and done in one "task".
Add the `--delete` argument before `purge-garbage` to enable deletion. This is intentionally
not provided inline in the example above to avoid accidents. Without the `--delete` flag
the purge command will log all the keys that it would have deleted.
- The tool does uses only publicly available remote resources (S3, console) and does not access pageserver/safekeeper nodes themselves.
Yet, its S3 set up should be prepared for running on any pageserver/safekeeper node, using node's S3 credentials, so the node API access logic could be implemented relatively simply on top.
#### `scan-metadata`
## Cleanup procedure:
Walk objects in a pageserver S3 bucket, and report statistics on the contents.
### Pageserver preparations
```
env SSO_ACCOUNT_ID=123456 REGION=eu-west-1 BUCKET=my-dev-bucket CLOUD_ADMIN_API_TOKEN=${NEON_CLOUD_ADMIN_API_STAGING_KEY} CLOUD_ADMIN_API_URL=[url] cargo run --release -- scan-metadata
Timelines: 31106
With errors: 3
With warnings: 13942
With garbage: 0
Index versions: 2: 13942, 4: 17162
Timeline size bytes: min 22413312, 1% 52133887, 10% 56459263, 50% 101711871, 90% 191561727, 99% 280887295, max 167535558656
Layer size bytes: min 24576, 1% 36879, 10% 36879, 50% 61471, 90% 44695551, 99% 201457663, max 275324928
Timeline layer count: min 1, 1% 3, 10% 6, 50% 16, 90% 25, 99% 39, max 1053
```
## Cleaning up running pageservers
If S3 state is altered first manually, pageserver in-memory state will contain wrong data about S3 state, and tenants/timelines may get recreated on S3 (due to any layer upload due to compaction, pageserver restart, etc.). So before proceeding, for tenants/timelines which are already deleted in the console, we must remove these from pageservers.

View File

@@ -1,178 +1,27 @@
use std::collections::{hash_map, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use std::collections::HashSet;
use anyhow::Context;
use aws_sdk_s3::Client;
use tokio::task::JoinSet;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{error, info, warn};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
use crate::delete_batch_producer::DeleteProducerStats;
use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES};
use crate::cloud_admin_api::BranchData;
use crate::{download_object_with_retries, list_objects_with_retries, RootTarget};
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use utils::id::TenantTimelineId;
pub async fn validate_pageserver_active_tenant_and_timelines(
s3_client: Arc<Client>,
s3_root: RootTarget,
admin_client: Arc<CloudAdminApiClient>,
batch_producer_stats: DeleteProducerStats,
) -> anyhow::Result<BranchCheckStats> {
let Some(timeline_stats) = batch_producer_stats.timeline_stats else {
info!("No tenant-only checks, exiting");
return Ok(BranchCheckStats::default());
};
let s3_active_projects = batch_producer_stats
.tenant_stats
.active_entries
.into_iter()
.map(|project| (project.id.clone(), project))
.collect::<HashMap<_, _>>();
info!("Validating {} active tenants", s3_active_projects.len());
let mut s3_active_branches_per_project = HashMap::<ProjectId, Vec<BranchData>>::new();
let mut s3_blob_data = HashMap::<TenantTimelineId, S3TimelineBlobData>::new();
for active_branch in timeline_stats.active_entries {
let active_project_id = active_branch.project_id.clone();
let active_branch_id = active_branch.id.clone();
let active_timeline_id = active_branch.timeline_id;
s3_active_branches_per_project
.entry(active_project_id.clone())
.or_default()
.push(active_branch);
let Some(active_project) = s3_active_projects.get(&active_project_id) else {
error!(
"Branch {:?} for project {:?} has no such project in the active projects",
active_branch_id, active_project_id
);
continue;
};
let id = TenantTimelineId::new(active_project.tenant, active_timeline_id);
s3_blob_data.insert(
id,
list_timeline_blobs(&s3_client, id, &s3_root)
.await
.with_context(|| format!("List timeline {id} blobs"))?,
);
}
let mut branch_checks = JoinSet::new();
for (_, s3_active_project) in s3_active_projects {
let project_id = &s3_active_project.id;
let tenant_id = s3_active_project.tenant;
let mut console_active_branches =
branches_for_project_with_retries(&admin_client, project_id)
.await
.with_context(|| {
format!("Client API branches for project {project_id:?} retrieval")
})?
.into_iter()
.map(|branch| (branch.id.clone(), branch))
.collect::<HashMap<_, _>>();
let active_branches = s3_active_branches_per_project
.remove(project_id)
.unwrap_or_default();
info!(
"Spawning tasks for {} tenant {} active timelines",
active_branches.len(),
tenant_id
);
for s3_active_branch in active_branches {
let console_branch = console_active_branches.remove(&s3_active_branch.id);
let timeline_id = s3_active_branch.timeline_id;
let id = TenantTimelineId::new(tenant_id, timeline_id);
let s3_data = s3_blob_data.remove(&id);
let s3_root = s3_root.clone();
branch_checks.spawn(
async move {
let check_errors = branch_cleanup_and_check_errors(
&id,
&s3_root,
Some(&s3_active_branch),
console_branch,
s3_data,
)
.await;
(id, check_errors)
}
.instrument(info_span!("check_timeline", id = %id)),
);
}
}
let mut total_stats = BranchCheckStats::default();
while let Some((id, analysis)) = branch_checks
.join_next()
.await
.transpose()
.context("branch check task join")?
{
total_stats.add(id, analysis.errors);
}
Ok(total_stats)
}
async fn branches_for_project_with_retries(
admin_client: &CloudAdminApiClient,
project_id: &ProjectId,
) -> anyhow::Result<Vec<BranchData>> {
for _ in 0..MAX_RETRIES {
match admin_client.branches_for_project(project_id, false).await {
Ok(branches) => return Ok(branches),
Err(e) => {
error!("admin list branches for project {project_id:?} query failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to list branches for project {project_id:?} {MAX_RETRIES} times")
}
#[derive(Debug, Default)]
pub struct BranchCheckStats {
pub timelines_with_errors: HashMap<TenantTimelineId, Vec<String>>,
pub normal_timelines: HashSet<TenantTimelineId>,
}
impl BranchCheckStats {
pub fn add(&mut self, id: TenantTimelineId, check_errors: Vec<String>) {
if check_errors.is_empty() {
if !self.normal_timelines.insert(id) {
panic!("Checking branch with timeline {id} more than once")
}
} else {
match self.timelines_with_errors.entry(id) {
hash_map::Entry::Occupied(_) => {
panic!("Checking branch with timeline {id} more than once")
}
hash_map::Entry::Vacant(v) => {
v.insert(check_errors);
}
}
}
}
}
pub struct TimelineAnalysis {
pub(crate) struct TimelineAnalysis {
/// Anomalies detected
pub errors: Vec<String>,
pub(crate) errors: Vec<String>,
/// Healthy-but-noteworthy, like old-versioned structures that are readable but
/// worth reporting for awareness that we must not remove that old version decoding
/// yet.
pub warnings: Vec<String>,
pub(crate) warnings: Vec<String>,
/// Keys not referenced in metadata: candidates for removal
pub garbage_keys: Vec<String>,
/// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
/// of races between reading the metadata and reading the objects.
pub(crate) garbage_keys: Vec<String>,
}
impl TimelineAnalysis {
@@ -185,7 +34,7 @@ impl TimelineAnalysis {
}
}
pub async fn branch_cleanup_and_check_errors(
pub(crate) async fn branch_cleanup_and_check_errors(
id: &TenantTimelineId,
s3_root: &RootTarget,
s3_active_branch: Option<&BranchData>,
@@ -320,13 +169,13 @@ pub async fn branch_cleanup_and_check_errors(
}
#[derive(Debug)]
pub struct S3TimelineBlobData {
pub blob_data: BlobDataParseResult,
pub keys_to_remove: Vec<String>,
pub(crate) struct S3TimelineBlobData {
pub(crate) blob_data: BlobDataParseResult,
pub(crate) keys_to_remove: Vec<String>,
}
#[derive(Debug)]
pub enum BlobDataParseResult {
pub(crate) enum BlobDataParseResult {
Parsed {
index_part: IndexPart,
s3_layers: HashSet<LayerFileName>,
@@ -334,7 +183,7 @@ pub enum BlobDataParseResult {
Incorrect(Vec<String>),
}
pub async fn list_timeline_blobs(
pub(crate) async fn list_timeline_blobs(
s3_client: &Client,
id: TenantTimelineId,
s3_root: &RootTarget,

View File

@@ -1,12 +1,19 @@
#![allow(unused)]
use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use reqwest::{header, Client, Url};
use hex::FromHex;
use reqwest::{header, Client, StatusCode, Url};
use serde::Deserialize;
use tokio::sync::Semaphore;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::ConsoleConfig;
#[derive(Debug)]
pub struct Error {
context: String,
@@ -34,6 +41,9 @@ impl std::fmt::Display for Error {
self.context, e
)
}
ErrorKind::ResponseStatus(status) => {
write!(f, "Bad response status {}: {}", status, self.context)
}
ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
}
}
@@ -53,6 +63,7 @@ impl std::error::Error for Error {}
pub enum ErrorKind {
RequestSend(reqwest::Error),
BodyRead(reqwest::Error),
ResponseStatus(StatusCode),
UnexpectedState,
}
@@ -100,7 +111,23 @@ pub struct SafekeeperData {
pub availability_zone_id: String,
}
#[serde_with::serde_as]
/// For ID fields, the Console API does not always return a value or null. It will
/// sometimes return an empty string. Our native Id type does not consider this acceptable
/// (nor should it), so we use a wrapper for talking to the Console API.
fn from_nullable_id<'de, D>(deserializer: D) -> Result<TenantId, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let id_str = String::deserialize(deserializer)?;
if id_str.is_empty() {
// This is a bogus value, but for the purposes of the scrubber all that
// matters is that it doesn't collide with any real IDs.
Ok(TenantId::from([0u8; 16]))
} else {
TenantId::from_hex(&id_str).map_err(|e| serde::de::Error::custom(format!("{e}")))
}
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ProjectData {
pub id: ProjectId,
@@ -109,7 +136,7 @@ pub struct ProjectData {
pub platform_id: String,
pub user_id: String,
pub pageserver_id: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
#[serde(deserialize_with = "from_nullable_id")]
pub tenant: TenantId,
pub safekeepers: Vec<SafekeeperData>,
pub deleted: bool,
@@ -148,11 +175,27 @@ pub struct BranchData {
pub written_size: Option<u64>,
}
pub trait MaybeDeleted {
fn is_deleted(&self) -> bool;
}
impl MaybeDeleted for ProjectData {
fn is_deleted(&self) -> bool {
self.deleted
}
}
impl MaybeDeleted for BranchData {
fn is_deleted(&self) -> bool {
self.deleted
}
}
impl CloudAdminApiClient {
pub fn new(token: String, base_url: Url) -> Self {
pub fn new(config: ConsoleConfig) -> Self {
Self {
token,
base_url,
token: config.token,
base_url: config.base_url,
request_limiter: Semaphore::new(200),
http_client: Client::new(), // TODO timeout configs at least
}
@@ -208,6 +251,81 @@ impl CloudAdminApiClient {
}
}
pub async fn list_projects(&self, region_id: String) -> Result<Vec<ProjectData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let mut pagination_offset = 0;
const PAGINATION_LIMIT: usize = 512;
let mut result: Vec<ProjectData> = Vec::with_capacity(PAGINATION_LIMIT);
loop {
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("show_deleted", "false".to_string()),
("limit", format!("{PAGINATION_LIMIT}")),
("offset", format!("{pagination_offset}")),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"List active projects".to_string(),
ErrorKind::RequestSend(e),
)
})?;
match response.status() {
StatusCode::OK => {}
StatusCode::SERVICE_UNAVAILABLE | StatusCode::TOO_MANY_REQUESTS => {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
status => {
return Err(Error::new(
"List active projects".to_string(),
ErrorKind::ResponseStatus(response.status()),
))
}
}
let response_bytes = response.bytes().await.map_err(|e| {
Error::new("List active projects".to_string(), ErrorKind::BodyRead(e))
})?;
let decode_result =
serde_json::from_slice::<AdminApiResponse<Vec<ProjectData>>>(&response_bytes);
let mut response = match decode_result {
Ok(r) => r,
Err(decode) => {
tracing::error!(
"Failed to decode response body: {}\n{}",
decode,
String::from_utf8(response_bytes.to_vec()).unwrap()
);
panic!("we out");
}
};
pagination_offset += response.data.len();
result.extend(response.data.drain(..).filter(|t| t.region_id == region_id));
if pagination_offset >= response.total.unwrap_or(0) {
break;
}
}
Ok(result)
}
pub async fn find_timeline_branch(
&self,
timeline_id: TimelineId,

View File

@@ -1,354 +0,0 @@
mod tenant_batch;
mod timeline_batch;
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aws_sdk_s3::Client;
use either::Either;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::Mutex;
use tokio::task::{JoinHandle, JoinSet};
use tracing::{error, info, info_span, Instrument};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
use crate::{list_objects_with_retries, RootTarget, S3Target, TraversingDepth, MAX_RETRIES};
use utils::id::{TenantId, TenantTimelineId};
/// Typical tenant to remove contains 1 layer and 1 index_part.json blobs
/// Also, there are some non-standard tenants to remove, having more layers.
/// delete_objects request allows up to 1000 keys, so be on a safe side and allow most
/// batch processing tasks to do 1 delete objects request only.
///
/// Every batch item will be additionally S3 LS'ed later, so keep the batch size
/// even lower to allow multiple concurrent tasks do the LS requests.
const BATCH_SIZE: usize = 100;
pub struct DeleteBatchProducer {
delete_tenants_sender_task: JoinHandle<anyhow::Result<ProcessedS3List<TenantId, ProjectData>>>,
delete_timelines_sender_task:
JoinHandle<anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>>>,
delete_batch_creator_task: JoinHandle<()>,
delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
}
pub struct DeleteProducerStats {
pub tenant_stats: ProcessedS3List<TenantId, ProjectData>,
pub timeline_stats: Option<ProcessedS3List<TenantTimelineId, BranchData>>,
}
impl DeleteProducerStats {
pub fn tenants_checked(&self) -> usize {
self.tenant_stats.entries_total
}
pub fn active_tenants(&self) -> usize {
self.tenant_stats.active_entries.len()
}
pub fn timelines_checked(&self) -> usize {
self.timeline_stats
.as_ref()
.map(|stats| stats.entries_total)
.unwrap_or(0)
}
}
#[derive(Debug, Default, Clone)]
pub struct DeleteBatch {
pub tenants: Vec<TenantId>,
pub timelines: Vec<TenantTimelineId>,
}
impl DeleteBatch {
pub fn merge(&mut self, other: Self) {
self.tenants.extend(other.tenants);
self.timelines.extend(other.timelines);
}
pub fn len(&self) -> usize {
self.tenants.len() + self.timelines.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl DeleteBatchProducer {
pub fn start(
admin_client: Arc<CloudAdminApiClient>,
s3_client: Arc<Client>,
s3_root_target: RootTarget,
traversing_depth: TraversingDepth,
) -> Self {
let (delete_elements_sender, mut delete_elements_receiver) =
tokio::sync::mpsc::unbounded_channel();
let delete_elements_sender = Arc::new(delete_elements_sender);
let admin_client = Arc::new(admin_client);
let (projects_to_check_sender, mut projects_to_check_receiver) =
tokio::sync::mpsc::unbounded_channel();
let delete_tenants_root_target = s3_root_target.clone();
let delete_tenants_client = Arc::clone(&s3_client);
let delete_tenants_admin_client = Arc::clone(&admin_client);
let delete_sender = Arc::clone(&delete_elements_sender);
let delete_tenants_sender_task = tokio::spawn(
async move {
tenant_batch::schedule_cleanup_deleted_tenants(
&delete_tenants_root_target,
&delete_tenants_client,
&delete_tenants_admin_client,
projects_to_check_sender,
delete_sender,
traversing_depth,
)
.await
}
.instrument(info_span!("delete_tenants_sender")),
);
let delete_timelines_sender_task = tokio::spawn(async move {
timeline_batch::schedule_cleanup_deleted_timelines(
&s3_root_target,
&s3_client,
&admin_client,
&mut projects_to_check_receiver,
delete_elements_sender,
)
.in_current_span()
.await
});
let (delete_batch_sender, delete_batch_receiver) = tokio::sync::mpsc::unbounded_channel();
let delete_batch_creator_task = tokio::spawn(
async move {
'outer: loop {
let mut delete_batch = DeleteBatch::default();
while delete_batch.len() < BATCH_SIZE {
match delete_elements_receiver.recv().await {
Some(new_task) => match new_task {
Either::Left(tenant_id) => delete_batch.tenants.push(tenant_id),
Either::Right(timeline_id) => {
delete_batch.timelines.push(timeline_id)
}
},
None => {
info!("Task finished: sender dropped");
delete_batch_sender.send(delete_batch).ok();
break 'outer;
}
}
}
if !delete_batch.is_empty() {
delete_batch_sender.send(delete_batch).ok();
}
}
}
.instrument(info_span!("delete batch creator")),
);
Self {
delete_tenants_sender_task,
delete_timelines_sender_task,
delete_batch_creator_task,
delete_batch_receiver: Arc::new(Mutex::new(delete_batch_receiver)),
}
}
pub fn subscribe(&self) -> Arc<Mutex<UnboundedReceiver<DeleteBatch>>> {
self.delete_batch_receiver.clone()
}
pub async fn join(self) -> anyhow::Result<DeleteProducerStats> {
let (delete_tenants_task_result, delete_timelines_task_result, batch_task_result) = tokio::join!(
self.delete_tenants_sender_task,
self.delete_timelines_sender_task,
self.delete_batch_creator_task,
);
let tenant_stats = match delete_tenants_task_result {
Ok(Ok(stats)) => stats,
Ok(Err(tenant_deletion_error)) => return Err(tenant_deletion_error),
Err(join_error) => {
anyhow::bail!("Failed to join the delete tenant producing task: {join_error}")
}
};
let timeline_stats = match delete_timelines_task_result {
Ok(Ok(stats)) => Some(stats),
Ok(Err(timeline_deletion_error)) => return Err(timeline_deletion_error),
Err(join_error) => {
anyhow::bail!("Failed to join the delete timeline producing task: {join_error}")
}
};
match batch_task_result {
Ok(()) => (),
Err(join_error) => anyhow::bail!("Failed to join the batch forming task: {join_error}"),
};
Ok(DeleteProducerStats {
tenant_stats,
timeline_stats,
})
}
}
pub struct ProcessedS3List<I, A> {
pub entries_total: usize,
pub entries_to_delete: Vec<I>,
pub active_entries: Vec<A>,
}
impl<I, A> Default for ProcessedS3List<I, A> {
fn default() -> Self {
Self {
entries_total: 0,
entries_to_delete: Vec::new(),
active_entries: Vec::new(),
}
}
}
impl<I, A> ProcessedS3List<I, A> {
fn merge(&mut self, other: Self) {
self.entries_total += other.entries_total;
self.entries_to_delete.extend(other.entries_to_delete);
self.active_entries.extend(other.active_entries);
}
fn change_ids<NewI>(self, transform: impl Fn(I) -> NewI) -> ProcessedS3List<NewI, A> {
ProcessedS3List {
entries_total: self.entries_total,
entries_to_delete: self.entries_to_delete.into_iter().map(transform).collect(),
active_entries: self.active_entries,
}
}
}
async fn process_s3_target_recursively<F, Fut, I, E, A>(
s3_client: &Client,
target: &S3Target,
find_active_and_deleted_entries: F,
) -> anyhow::Result<ProcessedS3List<I, A>>
where
I: FromStr<Err = E> + Send + Sync,
E: Send + Sync + std::error::Error + 'static,
F: FnOnce(Vec<I>) -> Fut + Clone,
Fut: Future<Output = anyhow::Result<ProcessedS3List<I, A>>>,
{
let mut continuation_token = None;
let mut total_entries = ProcessedS3List::default();
loop {
let fetch_response =
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()
.unwrap_or_default()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
})
.collect::<anyhow::Result<Vec<I>>>()
.context("list and parse bucket's entry ids")?;
total_entries.merge(
(find_active_and_deleted_entries.clone())(new_entry_ids)
.await
.context("filter active and deleted entry ids")?,
);
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(total_entries)
}
enum FetchResult<A> {
Found(A),
Deleted,
Absent,
}
async fn split_to_active_and_deleted_entries<I, A, F, Fut>(
new_entry_ids: Vec<I>,
find_active_entry: F,
) -> anyhow::Result<ProcessedS3List<I, A>>
where
I: std::fmt::Display + Send + Sync + 'static + Copy,
A: Send + 'static,
F: FnOnce(I) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = anyhow::Result<FetchResult<A>>> + Send,
{
let entries_total = new_entry_ids.len();
let mut check_tasks = JoinSet::new();
let mut active_entries = Vec::with_capacity(entries_total);
let mut entries_to_delete = Vec::with_capacity(entries_total);
for new_entry_id in new_entry_ids {
let check_closure = find_active_entry.clone();
check_tasks.spawn(
async move {
(
new_entry_id,
async {
for _ in 0..MAX_RETRIES {
let closure_clone = check_closure.clone();
match closure_clone(new_entry_id).await {
Ok(active_entry) => return Ok(active_entry),
Err(e) => {
error!("find active entry admin API call failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to check entry {new_entry_id} {MAX_RETRIES} times")
}
.await,
)
}
.instrument(info_span!("filter_active_entries")),
);
}
while let Some(task_result) = check_tasks.join_next().await {
let (entry_id, entry_data_fetch_result) = task_result.context("task join")?;
match entry_data_fetch_result.context("entry data fetch")? {
FetchResult::Found(active_entry) => {
info!("Entry {entry_id} is alive, cannot delete");
active_entries.push(active_entry);
}
FetchResult::Deleted => {
info!("Entry {entry_id} deleted in the admin data, can safely delete");
entries_to_delete.push(entry_id);
}
FetchResult::Absent => {
info!("Entry {entry_id} absent in the admin data, can safely delete");
entries_to_delete.push(entry_id);
}
}
}
Ok(ProcessedS3List {
entries_total,
entries_to_delete,
active_entries,
})
}

View File

@@ -1,87 +0,0 @@
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::Client;
use either::Either;
use tokio::sync::mpsc::UnboundedSender;
use tracing::info;
use crate::cloud_admin_api::{CloudAdminApiClient, ProjectData};
use crate::delete_batch_producer::FetchResult;
use crate::{RootTarget, TraversingDepth};
use utils::id::{TenantId, TenantTimelineId};
use super::ProcessedS3List;
pub async fn schedule_cleanup_deleted_tenants(
s3_root_target: &RootTarget,
s3_client: &Arc<Client>,
admin_client: &Arc<CloudAdminApiClient>,
projects_to_check_sender: UnboundedSender<ProjectData>,
delete_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
traversing_depth: TraversingDepth,
) -> anyhow::Result<ProcessedS3List<TenantId, ProjectData>> {
info!(
"Starting to list the bucket from root {}",
s3_root_target.bucket_name()
);
s3_client
.head_bucket()
.bucket(s3_root_target.bucket_name())
.send()
.await
.with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
let check_client = Arc::clone(admin_client);
let tenant_stats = super::process_s3_target_recursively(
s3_client,
s3_root_target.tenants_root(),
|s3_tenants| async move {
let another_client = Arc::clone(&check_client);
super::split_to_active_and_deleted_entries(s3_tenants, move |tenant_id| async move {
let project_data = another_client
.find_tenant_project(tenant_id)
.await
.with_context(|| format!("Tenant {tenant_id} project admin check"))?;
Ok(if let Some(console_project) = project_data {
if console_project.deleted {
delete_sender.send(Either::Left(tenant_id)).ok();
FetchResult::Deleted
} else {
if traversing_depth == TraversingDepth::Timeline {
projects_to_check_sender.send(console_project.clone()).ok();
}
FetchResult::Found(console_project)
}
} else {
delete_sender.send(Either::Left(tenant_id)).ok();
FetchResult::Absent
})
})
.await
},
)
.await
.context("tenant batch processing")?;
info!(
"Among {} tenants, found {} tenants to delete and {} active ones",
tenant_stats.entries_total,
tenant_stats.entries_to_delete.len(),
tenant_stats.active_entries.len(),
);
let tenant_stats = match traversing_depth {
TraversingDepth::Tenant => {
info!("Finished listing the bucket for tenants only");
tenant_stats
}
TraversingDepth::Timeline => {
info!("Finished listing the bucket for tenants and sent {} active tenants to check for timelines", tenant_stats.active_entries.len());
tenant_stats
}
};
Ok(tenant_stats)
}

View File

@@ -1,102 +0,0 @@
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::Client;
use either::Either;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{info, info_span, Instrument};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
use crate::delete_batch_producer::{FetchResult, ProcessedS3List};
use crate::RootTarget;
use utils::id::{TenantId, TenantTimelineId};
pub async fn schedule_cleanup_deleted_timelines(
s3_root_target: &RootTarget,
s3_client: &Arc<Client>,
admin_client: &Arc<CloudAdminApiClient>,
projects_to_check_receiver: &mut UnboundedReceiver<ProjectData>,
delete_elements_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
) -> anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>> {
info!(
"Starting to list the bucket from root {}",
s3_root_target.bucket_name()
);
s3_client
.head_bucket()
.bucket(s3_root_target.bucket_name())
.send()
.await
.with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
let mut timeline_stats = ProcessedS3List::default();
while let Some(project_to_check) = projects_to_check_receiver.recv().await {
let check_client = Arc::clone(admin_client);
let check_s3_client = Arc::clone(s3_client);
let check_delete_sender = Arc::clone(&delete_elements_sender);
let check_root = s3_root_target.clone();
let new_stats = async move {
let tenant_id_to_check = project_to_check.tenant;
let check_target = check_root.timelines_root(&tenant_id_to_check);
let stats = super::process_s3_target_recursively(
&check_s3_client,
&check_target,
|s3_timelines| async move {
let another_client = check_client.clone();
super::split_to_active_and_deleted_entries(
s3_timelines,
move |timeline_id| async move {
let console_branch = another_client
.find_timeline_branch(timeline_id)
.await
.map_err(|e| {
anyhow::anyhow!(
"Timeline {timeline_id} branch admin check: {e}"
)
})?;
let id = TenantTimelineId::new(tenant_id_to_check, timeline_id);
Ok(match console_branch {
Some(console_branch) => {
if console_branch.deleted {
check_delete_sender.send(Either::Right(id)).ok();
FetchResult::Deleted
} else {
FetchResult::Found(console_branch)
}
}
None => {
check_delete_sender.send(Either::Right(id)).ok();
FetchResult::Absent
}
})
},
)
.await
},
)
.await
.with_context(|| format!("tenant {tenant_id_to_check} timeline batch processing"))?
.change_ids(|timeline_id| TenantTimelineId::new(tenant_id_to_check, timeline_id));
Ok::<_, anyhow::Error>(stats)
}
.instrument(info_span!("delete_timelines_sender", tenant = %project_to_check.tenant))
.await?;
timeline_stats.merge(new_stats);
}
info!(
"Among {} timelines, found {} timelines to delete and {} active ones",
timeline_stats.entries_total,
timeline_stats.entries_to_delete.len(),
timeline_stats.active_entries.len(),
);
Ok(timeline_stats)
}

419
s3_scrubber/src/garbage.rs Normal file
View File

@@ -0,0 +1,419 @@
//! Functionality for finding and purging garbage, as in "garbage collection". Garbage means
//! S3 objects which are either not referenced by any metadata, or are referenced by a
//! control plane tenant/timeline in a deleted state.
use std::{collections::HashMap, sync::Arc};
use anyhow::Context;
use aws_sdk_s3::{
types::{Delete, ObjectIdentifier},
Client,
};
use futures_util::{pin_mut, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use utils::id::{TenantId, TenantTimelineId};
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote,
metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TraversingDepth,
};
#[derive(Serialize, Deserialize, Debug)]
enum GarbageReason {
DeletedInConsole,
MissingInConsole,
}
#[derive(Serialize, Deserialize, Debug)]
enum GarbageEntity {
Tenant(TenantId),
Timeline(TenantTimelineId),
}
#[derive(Serialize, Deserialize, Debug)]
struct GarbageItem {
entity: GarbageEntity,
reason: GarbageReason,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GarbageList {
/// Remember what NodeKind we were finding garbage for, so that we can
/// purge the list without re-stating it.
node_kind: NodeKind,
/// Embed the identity of the bucket, so that we do not risk executing
/// the wrong list against the wrong bucket, and so that the user does not have
/// to re-state the bucket details when purging.
bucket_config: BucketConfig,
items: Vec<GarbageItem>,
/// Advisory information to enable consumers to do a validation that if we
/// see garbage, we saw some active tenants too. This protects against classes of bugs
/// in the scrubber that might otherwise generate a "deleted all" result.
active_tenant_count: usize,
}
impl GarbageList {
fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
Self {
items: Vec::new(),
active_tenant_count: 0,
node_kind,
bucket_config,
}
}
/// Return true if appended, false if not. False means the result was not garbage.
fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
where
T: MaybeDeleted,
{
match result {
Some(result_item) if result_item.is_deleted() => {
self.items.push(GarbageItem {
entity,
reason: GarbageReason::DeletedInConsole,
});
true
}
Some(_) => false,
None => {
self.items.push(GarbageItem {
entity,
reason: GarbageReason::MissingInConsole,
});
true
}
}
}
}
pub async fn find_garbage(
bucket_config: BucketConfig,
console_config: ConsoleConfig,
depth: TraversingDepth,
node_kind: NodeKind,
output_path: String,
) -> anyhow::Result<()> {
let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
let serialized = serde_json::to_vec_pretty(&garbage)?;
tokio::fs::write(&output_path, &serialized).await?;
tracing::info!("Wrote garbage report to {output_path}");
Ok(())
}
// How many concurrent S3 operations to issue (approximately): this is the concurrency
// for things like listing the timelines within tenant prefixes.
const S3_CONCURRENCY: usize = 32;
// How many concurrent API requests to make to the console API.
const CONSOLE_CONCURRENCY: usize = 128;
async fn find_garbage_inner(
bucket_config: BucketConfig,
console_config: ConsoleConfig,
depth: TraversingDepth,
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
// to issue O(N) console API requests.
let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
// FIXME: we can't just assume that all console's region ids are aws-<something>. This hack
// will go away when we are talking to Control Plane APIs, which are per-region.
.list_projects(format!("aws-{}", bucket_config.region))
.await?
.into_iter()
.map(|t| (t.tenant, t))
.collect();
tracing::info!(
"Loaded {} console projects tenant IDs",
console_projects.len()
);
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants(&s3_client, &target);
let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone();
let console_projects = &console_projects;
async move {
match console_projects.get(&t) {
Some(project_data) => Ok((t, Some(project_data.clone()))),
None => api_client
.find_tenant_project(t)
.await
.map_err(|e| anyhow::anyhow!(e))
.map(|r| (t, r)),
}
}
});
let tenants_checked = tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
// Process the results of Tenant checks. If a Tenant is garbage, it goes into
// the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
// checks if they are enabled by the `depth` parameter.
pin_mut!(tenants_checked);
let mut garbage = GarbageList::new(node_kind, bucket_config);
let mut active_tenants: Vec<TenantId> = vec![];
let mut counter = 0;
while let Some(result) = tenants_checked.next().await {
let (tenant_id, console_result) = result?;
// Paranoia check
if let Some(project) = &console_result {
assert!(project.tenant == tenant_id);
}
if garbage.maybe_append(GarbageEntity::Tenant(tenant_id), console_result) {
tracing::debug!("Tenant {tenant_id} is garbage");
} else {
tracing::debug!("Tenant {tenant_id} is active");
active_tenants.push(tenant_id);
}
counter += 1;
if counter % 1000 == 0 {
tracing::info!(
"Progress: {counter} tenants checked, {} active, {} garbage",
active_tenants.len(),
garbage.items.len()
);
}
}
tracing::info!(
"Found {}/{} garbage tenants",
garbage.items.len(),
garbage.items.len() + active_tenants.len()
);
// If we are only checking tenant-deep, we are done. Otherwise we must
// proceed to check the individual timelines of the active tenants.
if depth == TraversingDepth::Tenant {
return Ok(garbage);
}
tracing::info!(
"Checking timelines for {} active tenants",
active_tenants.len(),
);
// Construct a stream of all timelines within active tenants
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
let timelines = timelines.try_flatten();
// For all timelines within active tenants, call into console API to check their existence
let timelines_checked = timelines.map_ok(|ttid| {
let api_client = cloud_admin_api_client.clone();
async move {
api_client
.find_timeline_branch(ttid.timeline_id)
.await
.map_err(|e| anyhow::anyhow!(e))
.map(|r| (ttid, r))
}
});
let timelines_checked = timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
// Update the GarbageList with any timelines which appear not to exist.
pin_mut!(timelines_checked);
while let Some(result) = timelines_checked.next().await {
let (ttid, console_result) = result?;
if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
tracing::debug!("Timeline {ttid} is garbage");
} else {
tracing::debug!("Timeline {ttid} is active");
}
}
Ok(garbage)
}
#[derive(clap::ValueEnum, Debug, Clone)]
pub enum PurgeMode {
/// The safest mode: only delete tenants that were explicitly reported as deleted
/// by Console API.
DeletedOnly,
/// Delete all garbage tenants, including those which are only presumed to be deleted,
/// because the Console API could not find them.
DeletedAndMissing,
}
impl std::fmt::Display for PurgeMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PurgeMode::DeletedOnly => write!(f, "deleted-only"),
PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
}
}
}
pub async fn get_tenant_objects(
s3_client: &Arc<Client>,
target: RootTarget,
tenant_id: TenantId,
) -> anyhow::Result<Vec<ObjectIdentifier>> {
tracing::debug!("Listing objects in tenant {tenant_id}");
// TODO: apply extra validation based on object modification time. Don't purge
// tenants where any timeline's index_part.json has been touched recently.
let mut tenant_root = target.tenant_root(&tenant_id);
// Remove delimiter, so that object listing lists all keys in the prefix and not just
// common prefixes.
tenant_root.delimiter = String::new();
let key_stream = stream_listing(s3_client, &tenant_root);
key_stream.try_collect().await
}
pub async fn get_timeline_objects(
s3_client: &Arc<Client>,
target: RootTarget,
ttid: TenantTimelineId,
) -> anyhow::Result<Vec<ObjectIdentifier>> {
tracing::debug!("Listing objects in timeline {ttid}");
let mut timeline_root = target.timeline_root(&ttid);
// TODO: apply extra validation based on object modification time. Don't purge
// timelines whose index_part.json has been touched recently.
// Remove delimiter, so that object listing lists all keys in the prefix and not just
// common prefixes.
timeline_root.delimiter = String::new();
let key_stream = stream_listing(s3_client, &timeline_root);
key_stream.try_collect().await
}
const MAX_KEYS_PER_DELETE: usize = 1000;
/// Drain a buffer of keys into DeleteObjects requests
async fn do_delete(
s3_client: &Arc<Client>,
bucket_name: &str,
keys: &mut Vec<ObjectIdentifier>,
dry_run: bool,
drain: bool,
) -> anyhow::Result<()> {
while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
let request_keys =
keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
if dry_run {
tracing::info!("Dry-run deletion of objects: ");
for k in request_keys {
tracing::info!(" {k:?}");
}
} else {
let delete_request = s3_client
.delete_objects()
.bucket(bucket_name)
.delete(Delete::builder().set_objects(Some(request_keys)).build());
delete_request
.send()
.await
.context("DeleteObjects request")?;
}
}
Ok(())
}
pub async fn purge_garbage(
input_path: String,
mode: PurgeMode,
dry_run: bool,
) -> anyhow::Result<()> {
let list_bytes = tokio::fs::read(&input_path).await?;
let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
tracing::info!(
"Loaded {} items in garbage list from {}",
garbage_list.items.len(),
input_path
);
let (s3_client, target) =
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
// Sanity checks on the incoming list
if garbage_list.active_tenant_count == 0 {
anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
}
let filtered_items = garbage_list
.items
.iter()
.filter(|i| match (&mode, &i.reason) {
(PurgeMode::DeletedAndMissing, _) => true,
(PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
(PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
});
tracing::info!(
"Filtered down to {} garbage items based on mode {}",
garbage_list.items.len(),
mode
);
let items = tokio_stream::iter(filtered_items.map(Ok));
let get_objects_results = items.map_ok(|i| {
let s3_client = s3_client.clone();
let target = target.clone();
async move {
match i.entity {
GarbageEntity::Tenant(tenant_id) => {
get_tenant_objects(&s3_client, target, tenant_id).await
}
GarbageEntity::Timeline(ttid) => {
get_timeline_objects(&s3_client, target, ttid).await
}
}
}
});
let get_objects_results = get_objects_results.try_buffer_unordered(S3_CONCURRENCY);
pin_mut!(get_objects_results);
let mut objects_to_delete = Vec::new();
while let Some(result) = get_objects_results.next().await {
let mut object_list = result?;
objects_to_delete.append(&mut object_list);
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
do_delete(
&s3_client,
&garbage_list.bucket_config.bucket,
&mut objects_to_delete,
dry_run,
false,
)
.await?;
}
}
do_delete(
&s3_client,
&garbage_list.bucket_config.bucket,
&mut objects_to_delete,
dry_run,
true,
)
.await?;
tracing::info!("Fell through");
Ok(())
}

View File

@@ -1,12 +1,12 @@
pub mod checks;
pub mod cloud_admin_api;
pub mod delete_batch_producer;
pub mod garbage;
pub mod metadata_stream;
mod s3_deletion;
pub mod scan_metadata;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
@@ -17,8 +17,10 @@ use aws_config::sso::SsoCredentialsProvider;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::{Client, Config};
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use reqwest::Url;
pub use s3_deletion::S3Deleter;
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
use tokio::io::AsyncReadExt;
use tracing::error;
@@ -29,8 +31,6 @@ use utils::id::{TenantId, TenantTimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
pub const CLI_NAME: &str = "s3-scrubber";
#[derive(Debug, Clone)]
pub struct S3Target {
pub bucket_name: String,
@@ -53,6 +53,27 @@ impl Display for TraversingDepth {
}
}
#[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub enum NodeKind {
Safekeeper,
Pageserver,
}
impl NodeKind {
fn as_str(&self) -> &'static str {
match self {
Self::Safekeeper => "safekeeper",
Self::Pageserver => "pageserver",
}
}
}
impl Display for NodeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl S3Target {
pub fn with_sub_segment(&self, new_segment: &str) -> Self {
let mut new_self = self.clone();
@@ -108,6 +129,7 @@ impl RootTarget {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketConfig {
pub region: String,
pub bucket: String,
@@ -143,31 +165,21 @@ impl BucketConfig {
}
pub struct ConsoleConfig {
pub admin_api_url: Url,
pub token: String,
pub base_url: Url,
}
impl ConsoleConfig {
pub fn from_env() -> anyhow::Result<Self> {
let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
.context("'CLOUD_ADMIN_API_URL' param retrieval")?
.parse()
.context("'CLOUD_ADMIN_API_URL' param parsing")?;
Ok(Self { admin_api_url })
}
}
let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
.context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
pub fn get_cloud_admin_api_token_or_exit() -> String {
match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
Ok(token) => token,
Err(env::VarError::NotPresent) => {
error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
std::process::exit(1);
}
Err(env::VarError::NotUnicode(not_unicode_string)) => {
error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
std::process::exit(1);
}
Ok(Self { base_url, token })
}
}
@@ -231,6 +243,29 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
Client::from_conf(builder.build())
}
fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(&delimiter),
delimiter,
}),
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
delimiter,
}),
};
Ok((s3_client, s3_root))
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,

View File

@@ -1,20 +1,8 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::num::NonZeroUsize;
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::config::Region;
use s3_scrubber::cloud_admin_api::CloudAdminApiClient;
use s3_scrubber::delete_batch_producer::DeleteBatchProducer;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::{
checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, BucketConfig,
ConsoleConfig, RootTarget, S3Deleter, S3Target, TraversingDepth, CLI_NAME,
};
use tracing::{info, warn};
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
use clap::{Parser, Subcommand, ValueEnum};
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
@@ -27,212 +15,45 @@ struct Cli {
delete: bool,
}
#[derive(ValueEnum, Clone, Copy, Eq, PartialEq)]
enum NodeKind {
Safekeeper,
Pageserver,
}
impl NodeKind {
fn as_str(&self) -> &'static str {
match self {
Self::Safekeeper => "safekeeper",
Self::Pageserver => "pageserver",
}
}
}
impl Display for NodeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Subcommand)]
#[derive(Subcommand, Debug)]
enum Command {
Tidy {
FindGarbage {
#[arg(short, long)]
node_kind: NodeKind,
#[arg(short, long, default_value_t=TraversingDepth::Tenant)]
depth: TraversingDepth,
#[arg(short, long, default_value_t = false)]
skip_validation: bool,
#[arg(short, long, default_value_t = String::from("garbage.json"))]
output_path: String,
},
PurgeGarbage {
#[arg(short, long)]
input_path: String,
#[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
mode: PurgeMode,
},
ScanMetadata {},
}
async fn tidy(
cli: &Cli,
bucket_config: BucketConfig,
console_config: ConsoleConfig,
node_kind: NodeKind,
depth: TraversingDepth,
skip_validation: bool,
) -> anyhow::Result<()> {
let dry_run = !cli.delete;
let file_name = if dry_run {
format!(
"{}_{}_{}__dry.log",
CLI_NAME,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
} else {
format!(
"{}_{}_{}.log",
CLI_NAME,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
};
let _guard = init_logging(&file_name);
if dry_run {
info!("Dry run, not removing items for real");
} else {
warn!("Dry run disabled, removing bucket items for real");
}
info!("skip_validation={skip_validation}");
info!("Starting extra S3 removal in {bucket_config} for node kind '{node_kind}', traversing depth: {depth:?}");
info!("Starting extra tenant S3 removal in {bucket_config} for node kind '{node_kind}'");
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(
get_cloud_admin_api_token_or_exit(),
console_config.admin_api_url,
));
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["pageserver", "v1", "tenants", ""].join(&delimiter),
delimiter,
}),
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
delimiter,
}),
};
let delete_batch_producer = DeleteBatchProducer::start(
Arc::clone(&cloud_admin_api_client),
Arc::clone(&s3_client),
s3_root.clone(),
depth,
);
let s3_deleter = S3Deleter::new(
dry_run,
NonZeroUsize::new(15).unwrap(),
Arc::clone(&s3_client),
delete_batch_producer.subscribe(),
s3_root.clone(),
);
let (deleter_task_result, batch_producer_task_result) =
tokio::join!(s3_deleter.remove_all(), delete_batch_producer.join());
let deletion_stats = deleter_task_result.context("s3 deletion")?;
info!(
"Deleted {} tenants ({} keys) and {} timelines ({} keys) total. Dry run: {}",
deletion_stats.deleted_tenant_keys.len(),
deletion_stats.deleted_tenant_keys.values().sum::<usize>(),
deletion_stats.deleted_timeline_keys.len(),
deletion_stats.deleted_timeline_keys.values().sum::<usize>(),
dry_run,
);
info!(
"Total tenant deletion stats: {:?}",
deletion_stats
.deleted_tenant_keys
.into_iter()
.map(|(id, key)| (id.to_string(), key))
.collect::<HashMap<_, _>>()
);
info!(
"Total timeline deletion stats: {:?}",
deletion_stats
.deleted_timeline_keys
.into_iter()
.map(|(id, key)| (id.to_string(), key))
.collect::<HashMap<_, _>>()
);
let batch_producer_stats = batch_producer_task_result.context("delete batch producer join")?;
info!(
"Total bucket tenants listed: {}; for {} active tenants, timelines checked: {}",
batch_producer_stats.tenants_checked(),
batch_producer_stats.active_tenants(),
batch_producer_stats.timelines_checked()
);
if node_kind == NodeKind::Pageserver {
info!("node_kind != pageserver, finish without performing validation step");
return Ok(());
}
if skip_validation {
info!("--skip-validation is set, exiting");
return Ok(());
}
info!("validating active tenants and timelines for pageserver S3 data");
// TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
s3_client,
s3_root,
cloud_admin_api_client,
batch_producer_stats,
)
.await
.context("active tenant and timeline validation")?;
info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
if !validation_stats.timelines_with_errors.is_empty() {
warn!(
"Validation errors: {:#?}",
validation_stats
.timelines_with_errors
.into_iter()
.map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
.collect::<HashMap<_, _>>()
);
}
info!("Done");
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let bucket_config = BucketConfig::from_env()?;
let command_log_name = match &cli.command {
Command::ScanMetadata { .. } => "scan",
Command::FindGarbage { .. } => "find-garbage",
Command::PurgeGarbage { .. } => "purge-garbage",
};
let _guard = init_logging(&format!(
"{}_{}_{}_{}.log",
std::env::args().next().unwrap(),
command_log_name,
bucket_config.bucket,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
));
match cli.command {
Command::Tidy {
node_kind,
depth,
skip_validation,
} => {
let console_config = ConsoleConfig::from_env()?;
tidy(
&cli,
bucket_config,
console_config,
node_kind,
depth,
skip_validation,
)
.await
}
Command::ScanMetadata {} => match scan_metadata(bucket_config).await {
Err(e) => {
tracing::error!("Failed: {e}");
@@ -247,5 +68,16 @@ async fn main() -> anyhow::Result<()> {
}
}
},
Command::FindGarbage {
node_kind,
depth,
output_path,
} => {
let console_config = ConsoleConfig::from_env()?;
find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
}
Command::PurgeGarbage { input_path, mode } => {
purge_garbage(input_path, mode, !cli.delete).await
}
}
}

View File

@@ -1,9 +1,9 @@
use anyhow::Context;
use async_stream::{stream, try_stream};
use aws_sdk_s3::Client;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use tokio_stream::Stream;
use crate::{list_objects_with_retries, RootTarget, TenantId};
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId};
use utils::id::{TenantTimelineId, TimelineId};
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
@@ -104,3 +104,34 @@ pub async fn stream_tenant_timelines<'a>(
}
})
}
pub(crate) fn stream_listing<'a>(
s3_client: &'a Client,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
try_stream! {
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
if target.delimiter.is_empty() {
for object_id in fetch_response.contents().unwrap_or_default().iter().filter_map(|object| object.key()).map(|i|
ObjectIdentifier::builder().key(i).build()
) {
yield object_id;
}
} else {
for prefix in fetch_response.common_prefixes().unwrap_or_default()
.iter().filter_map(|p| p.prefix().map(|k| ObjectIdentifier::builder().key(k).build())) {
yield prefix;
}
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}

View File

@@ -1,434 +0,0 @@
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::Client;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::{debug, error, info, info_span, Instrument};
use crate::delete_batch_producer::DeleteBatch;
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId, MAX_RETRIES};
use utils::id::TenantTimelineId;
pub struct S3Deleter {
dry_run: bool,
concurrent_tasks_count: NonZeroUsize,
delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
s3_client: Arc<Client>,
s3_target: RootTarget,
}
impl S3Deleter {
pub fn new(
dry_run: bool,
concurrent_tasks_count: NonZeroUsize,
s3_client: Arc<Client>,
delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
s3_target: RootTarget,
) -> Self {
Self {
dry_run,
concurrent_tasks_count,
delete_batch_receiver,
s3_client,
s3_target,
}
}
pub async fn remove_all(self) -> anyhow::Result<DeletionStats> {
let mut deletion_tasks = JoinSet::new();
for id in 0..self.concurrent_tasks_count.get() {
let closure_client = Arc::clone(&self.s3_client);
let closure_s3_target = self.s3_target.clone();
let closure_batch_receiver = Arc::clone(&self.delete_batch_receiver);
let dry_run = self.dry_run;
deletion_tasks.spawn(
async move {
info!("Task started");
(
id,
async move {
let mut task_stats = DeletionStats::default();
loop {
let mut guard = closure_batch_receiver.lock().await;
let receiver_result = guard.try_recv();
drop(guard);
match receiver_result {
Ok(batch) => {
let stats = delete_batch(
&closure_client,
&closure_s3_target,
batch,
dry_run,
)
.await
.context("batch deletion")?;
debug!(
"Batch processed, number of objects deleted per tenant in the batch is: {}, per timeline — {}",
stats.deleted_tenant_keys.len(),
stats.deleted_timeline_keys.len(),
);
task_stats.merge(stats);
}
Err(TryRecvError::Empty) => {
debug!("No tasks yet, waiting");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Err(TryRecvError::Disconnected) => {
info!("Task finished: sender dropped");
return Ok(task_stats);
}
}
}
}
.in_current_span()
.await,
)
}
.instrument(info_span!("deletion_task", %id)),
);
}
let mut total_stats = DeletionStats::default();
while let Some(task_result) = deletion_tasks.join_next().await {
match task_result {
Ok((id, Ok(task_stats))) => {
info!("Task {id} completed");
total_stats.merge(task_stats);
}
Ok((id, Err(e))) => {
error!("Task {id} failed: {e:#}");
return Err(e);
}
Err(join_error) => anyhow::bail!("Failed to join on a task: {join_error:?}"),
}
}
Ok(total_stats)
}
}
/// S3 delete_objects allows up to 1000 keys to be passed in a single request.
/// Yet if you pass too many key requests, apparently S3 could return with OK and
/// actually delete nothing, so keep the number lower.
const MAX_ITEMS_TO_DELETE: usize = 200;
#[derive(Debug, Default)]
pub struct DeletionStats {
pub deleted_tenant_keys: BTreeMap<TenantId, usize>,
pub deleted_timeline_keys: BTreeMap<TenantTimelineId, usize>,
}
impl DeletionStats {
fn merge(&mut self, other: Self) {
self.deleted_tenant_keys.extend(other.deleted_tenant_keys);
self.deleted_timeline_keys
.extend(other.deleted_timeline_keys);
}
}
async fn delete_batch(
s3_client: &Client,
s3_target: &RootTarget,
batch: DeleteBatch,
dry_run: bool,
) -> anyhow::Result<DeletionStats> {
let (deleted_tenant_keys, deleted_timeline_keys) = tokio::join!(
delete_tenants_batch(batch.tenants, s3_target, s3_client, dry_run),
delete_timelines_batch(batch.timelines, s3_target, s3_client, dry_run),
);
Ok(DeletionStats {
deleted_tenant_keys: deleted_tenant_keys.context("tenant batch deletion")?,
deleted_timeline_keys: deleted_timeline_keys.context("timeline batch deletion")?,
})
}
async fn delete_tenants_batch(
batched_tenants: Vec<TenantId>,
s3_target: &RootTarget,
s3_client: &Client,
dry_run: bool,
) -> Result<BTreeMap<TenantId, usize>, anyhow::Error> {
info!("Deleting tenants batch of size {}", batched_tenants.len());
info!("Tenant ids to remove: {batched_tenants:?}");
let deleted_keys = delete_elements(
&batched_tenants,
s3_target,
s3_client,
dry_run,
|root_target, tenant_to_delete| root_target.tenant_root(&tenant_to_delete),
)
.await?;
if !dry_run {
let mut last_err = None;
for _ in 0..MAX_RETRIES {
match ensure_tenant_batch_deleted(s3_client, s3_target, &batched_tenants).await {
Ok(()) => {
last_err = None;
break;
}
Err(e) => {
error!("Failed to ensure the tenant batch is deleted: {e}");
last_err = Some(e);
}
}
}
if let Some(e) = last_err {
anyhow::bail!(
"Failed to ensure that tenant batch is deleted {MAX_RETRIES} times: {e:?}"
);
}
}
Ok(deleted_keys)
}
async fn delete_timelines_batch(
batched_timelines: Vec<TenantTimelineId>,
s3_target: &RootTarget,
s3_client: &Client,
dry_run: bool,
) -> Result<BTreeMap<TenantTimelineId, usize>, anyhow::Error> {
info!(
"Deleting timelines batch of size {}",
batched_timelines.len()
);
info!(
"Timeline ids to remove: {:?}",
batched_timelines
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
);
let deleted_keys = delete_elements(
&batched_timelines,
s3_target,
s3_client,
dry_run,
|root_target, timeline_to_delete| root_target.timeline_root(&timeline_to_delete),
)
.await?;
if !dry_run {
let mut last_err = None;
for _ in 0..MAX_RETRIES {
match ensure_timeline_batch_deleted(s3_client, s3_target, &batched_timelines).await {
Ok(()) => {
last_err = None;
break;
}
Err(e) => {
error!("Failed to ensure the timelines batch is deleted: {e}");
last_err = Some(e);
}
}
}
if let Some(e) = last_err {
anyhow::bail!(
"Failed to ensure that timeline batch is deleted {MAX_RETRIES} times: {e:?}"
);
}
}
Ok(deleted_keys)
}
async fn delete_elements<I>(
batched_ids: &Vec<I>,
s3_target: &RootTarget,
s3_client: &Client,
dry_run: bool,
target_producer: impl Fn(&RootTarget, I) -> S3Target,
) -> Result<BTreeMap<I, usize>, anyhow::Error>
where
I: Ord + PartialOrd + Copy,
{
let mut deleted_keys = BTreeMap::new();
let mut object_ids_to_delete = Vec::with_capacity(MAX_ITEMS_TO_DELETE);
for &id_to_delete in batched_ids {
let mut continuation_token = None;
let mut subtargets = vec![target_producer(s3_target, id_to_delete)];
while let Some(current_target) = subtargets.pop() {
loop {
let fetch_response = list_objects_with_retries(
s3_client,
&current_target,
continuation_token.clone(),
)
.await?;
for object_id in fetch_response
.contents()
.unwrap_or_default()
.iter()
.filter_map(|object| object.key())
.map(|key| ObjectIdentifier::builder().key(key).build())
{
if object_ids_to_delete.len() >= MAX_ITEMS_TO_DELETE {
let object_ids_for_request = std::mem::replace(
&mut object_ids_to_delete,
Vec::with_capacity(MAX_ITEMS_TO_DELETE),
);
send_delete_request(
s3_client,
s3_target.bucket_name(),
object_ids_for_request,
dry_run,
)
.await
.context("object ids deletion")?;
}
object_ids_to_delete.push(object_id);
*deleted_keys.entry(id_to_delete).or_default() += 1;
}
subtargets.extend(
fetch_response
.common_prefixes()
.unwrap_or_default()
.iter()
.filter_map(|common_prefix| common_prefix.prefix())
.map(|prefix| {
let mut new_target = current_target.clone();
new_target.prefix_in_bucket = prefix.to_string();
new_target
}),
);
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
if !object_ids_to_delete.is_empty() {
info!("Removing last objects of the batch");
send_delete_request(
s3_client,
s3_target.bucket_name(),
object_ids_to_delete,
dry_run,
)
.await
.context("Last object ids deletion")?;
}
Ok(deleted_keys)
}
pub async fn send_delete_request(
s3_client: &Client,
bucket_name: &str,
ids: Vec<ObjectIdentifier>,
dry_run: bool,
) -> anyhow::Result<()> {
info!("Removing {} object ids from S3", ids.len());
info!("Object ids to remove: {ids:?}");
let delete_request = s3_client
.delete_objects()
.bucket(bucket_name)
.delete(Delete::builder().set_objects(Some(ids)).build());
if dry_run {
info!("Dry run, skipping the actual removal");
Ok(())
} else {
let original_request = delete_request.clone();
for _ in 0..MAX_RETRIES {
match delete_request
.clone()
.send()
.await
.context("delete request processing")
{
Ok(delete_response) => {
info!("Delete response: {delete_response:?}");
match delete_response.errors() {
Some(delete_errors) => {
error!("Delete request returned errors: {delete_errors:?}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
None => {
info!("Successfully removed an object batch from S3");
return Ok(());
}
}
}
Err(e) => {
error!("Failed to send a delete request: {e:#}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
error!("Failed to do deletion, request: {original_request:?}");
anyhow::bail!("Failed to run deletion request {MAX_RETRIES} times");
}
}
async fn ensure_tenant_batch_deleted(
s3_client: &Client,
s3_target: &RootTarget,
batch: &[TenantId],
) -> anyhow::Result<()> {
let mut not_deleted_tenants = Vec::with_capacity(batch.len());
for &tenant_id in batch {
let fetch_response =
list_objects_with_retries(s3_client, &s3_target.tenant_root(&tenant_id), None).await?;
if fetch_response.is_truncated()
|| fetch_response.contents().is_some()
|| fetch_response.common_prefixes().is_some()
{
error!(
"Tenant {tenant_id} should be deleted, but its list response is {fetch_response:?}"
);
not_deleted_tenants.push(tenant_id);
}
}
anyhow::ensure!(
not_deleted_tenants.is_empty(),
"Failed to delete all tenants in a batch. Tenants {not_deleted_tenants:?} should be deleted."
);
Ok(())
}
async fn ensure_timeline_batch_deleted(
s3_client: &Client,
s3_target: &RootTarget,
batch: &[TenantTimelineId],
) -> anyhow::Result<()> {
let mut not_deleted_timelines = Vec::with_capacity(batch.len());
for &id in batch {
let fetch_response =
list_objects_with_retries(s3_client, &s3_target.timeline_root(&id), None).await?;
if fetch_response.is_truncated()
|| fetch_response.contents().is_some()
|| fetch_response.common_prefixes().is_some()
{
error!("Timeline {id} should be deleted, but its list response is {fetch_response:?}");
not_deleted_timelines.push(id);
}
}
anyhow::ensure!(
not_deleted_timelines.is_empty(),
"Failed to delete all timelines in a batch"
);
Ok(())
}

View File

@@ -1,17 +1,15 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_logging, init_s3_client, BucketConfig, RootTarget, S3Target, CLI_NAME};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget};
use aws_sdk_s3::Client;
use aws_types::region::Region;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::{IndexPart, TENANTS_SEGMENT_NAME};
use pageserver::tenant::IndexPart;
use utils::id::TenantTimelineId;
pub struct MetadataSummary {
@@ -175,25 +173,7 @@ Timeline layer count: {6}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
let file_name = format!(
"{}_scan_metadata_{}_{}.log",
CLI_NAME,
bucket_config.bucket,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
);
let _guard = init_logging(&file_name);
let s3_client = Arc::new(init_s3_client(
bucket_config.sso_account_id,
Region::new(bucket_config.region),
));
let delimiter = "/";
let target = RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket.to_string(),
prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(delimiter),
delimiter: delimiter.to_string(),
});
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let tenants = stream_tenants(&s3_client, &target);