s3_scrubber: import into the main neon repository (#5141)

## Problem

The S3 scrubber currently lives at
https://github.com/neondatabase/s3-scrubber

We don't have tests that use it, and it has copies of some data
structures that can get stale.

## Summary of changes

- Import the s3-scrubber as `s3_scrubber/
- Replace copied_definitions/ in the scrubber with direct access to the
`utils` and `pageserver` crates
- Modify visibility of a few definitions in `pageserver` to allow the
scrubber to use them
- Update scrubber code for recent changes to `IndexPart`
- Update `KNOWN_VERSIONS` for IndexPart and move the definition into
index.rs so that it is easier to keep up to date

As a future refinement, it would be good to pull the remote persistence
types (like IndexPart) out of `pageserver` into a separate library so
that the scrubber doesn't have to link against the whole pageserver, and
so that it's clearer which types need to be public.

Co-authored-by: Kirill Bulatov <kirill@neon.tech>
Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
This commit is contained in:
John Spray
2023-08-31 19:01:39 +01:00
committed by GitHub
parent 1b916a105a
commit 616e7046c7
17 changed files with 2562 additions and 5 deletions

View File

@@ -14,6 +14,7 @@
!pgxn/
!proxy/
!safekeeper/
!s3_scrubber/
!storage_broker/
!trace/
!vendor/postgres-v14/

100
Cargo.lock generated
View File

@@ -213,6 +213,17 @@ dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -227,6 +238,7 @@ checksum = "de3d533e0263bf453cc80af4c8bcc4d64e2aca293bd16f81633a36f1bf4a97cb"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-sdk-sso",
"aws-sdk-sts",
"aws-smithy-async",
"aws-smithy-client",
@@ -237,12 +249,15 @@ dependencies = [
"aws-types",
"bytes",
"fastrand 2.0.0",
"hex",
"http",
"hyper",
"ring",
"time",
"tokio",
"tower",
"tracing",
"zeroize",
]
[[package]]
@@ -332,6 +347,30 @@ dependencies = [
"url",
]
[[package]]
name = "aws-sdk-sso"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f41bf2c28d32dbb9894a8fcfcb148265d034d3f4a170552a47553a09de890895"
dependencies = [
"aws-credential-types",
"aws-http",
"aws-runtime",
"aws-smithy-async",
"aws-smithy-client",
"aws-smithy-http",
"aws-smithy-json",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
"bytes",
"http",
"regex",
"tokio-stream",
"tracing",
]
[[package]]
name = "aws-sdk-sts"
version = "0.29.0"
@@ -1747,6 +1786,15 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.2.6"
@@ -3685,6 +3733,39 @@ version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "s3_scrubber"
version = "0.1.0"
dependencies = [
"anyhow",
"atty",
"aws-config",
"aws-sdk-s3",
"aws-smithy-http",
"aws-types",
"bincode",
"bytes",
"chrono",
"clap",
"crc32c",
"either",
"hex",
"pageserver",
"rand",
"reqwest",
"serde",
"serde_json",
"serde_with",
"thiserror",
"tokio",
"tokio-rustls",
"tracing",
"tracing-appender",
"tracing-subscriber",
"utils",
"workspace_hack",
]
[[package]]
name = "safekeeper"
version = "0.1.0"
@@ -4768,6 +4849,17 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"crossbeam-channel",
"time",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.24"
@@ -5506,6 +5598,10 @@ name = "workspace_hack"
version = "0.1.0"
dependencies = [
"anyhow",
"aws-config",
"aws-runtime",
"aws-sigv4",
"aws-smithy-http",
"axum",
"base64 0.21.1",
"bytes",
@@ -5514,7 +5610,6 @@ dependencies = [
"clap",
"clap_builder",
"crossbeam-utils",
"digest",
"either",
"fail",
"futures",
@@ -5523,6 +5618,7 @@ dependencies = [
"futures-executor",
"futures-sink",
"futures-util",
"hex",
"hyper",
"itertools",
"libc",
@@ -5546,6 +5642,7 @@ dependencies = [
"socket2 0.4.9",
"syn 1.0.109",
"syn 2.0.28",
"time",
"tokio",
"tokio-rustls",
"tokio-util",
@@ -5555,6 +5652,7 @@ dependencies = [
"tracing",
"tracing-core",
"url",
"uuid",
]
[[package]]

View File

@@ -7,6 +7,7 @@ members = [
"proxy",
"safekeeper",
"storage_broker",
"s3_scrubber",
"workspace_hack",
"trace",
"libs/compute_api",

View File

@@ -68,7 +68,7 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
pub use crate::tenant::remote_timeline_client::index::IndexPart;
use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;

View File

@@ -96,6 +96,10 @@ impl IndexPart {
/// is always generated from the keys of `layer_metadata`)
/// - 4: timeline_layers is fully removed.
const LATEST_VERSION: usize = 4;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &[usize] = &[1, 2, 3, 4];
pub const FILE_NAME: &'static str = "index_part.json";
pub fn new(
@@ -117,6 +121,16 @@ impl IndexPart {
deleted_at: None,
}
}
pub fn get_version(&self) -> usize {
self.version
}
/// If you want this under normal operations, read it from self.metadata:
/// this method is just for the scrubber to use when validating an index.
pub fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn
}
}
impl TryFrom<&UploadQueueInitialized> for IndexPart {
@@ -137,7 +151,7 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart {
/// Serialized form of [`LayerFileMetadata`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexLayerMetadata {
pub(super) file_size: u64,
pub file_size: u64,
#[serde(default = "Generation::none")]
#[serde(skip_serializing_if = "Generation::is_none")]

View File

@@ -212,7 +212,7 @@ pub enum LayerFileName {
}
impl LayerFileName {
pub(crate) fn file_name(&self) -> String {
pub fn file_name(&self) -> String {
self.to_string()
}

39
s3_scrubber/Cargo.toml Normal file
View File

@@ -0,0 +1,39 @@
[package]
name = "s3_scrubber"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
aws-sdk-s3.workspace = true
aws-smithy-http.workspace = true
aws-types.workspace = true
either.workspace = true
tokio-rustls.workspace = true
anyhow.workspace = true
hex.workspace = true
thiserror.workspace = true
rand.workspace = true
bytes.workspace = true
bincode.workspace = true
crc32c.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
workspace_hack.workspace = true
utils.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
reqwest = { workspace = true, default-features = false, features = ["rustls-tls", "json"] }
aws-config = { workspace = true, default-features = false, features = ["rustls", "credentials-sso"] }
pageserver = {path="../pageserver"}
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
atty = "0.2"
tracing-appender = "0.2"

93
s3_scrubber/README.md Normal file
View File

@@ -0,0 +1,93 @@
# Neon S3 scrubber
This tool directly accesses the S3 buckets used by the Neon `pageserver`
and `safekeeper`, and does housekeeping such as cleaning up objects for tenants & timelines that no longer exist.
## Usage
### Generic Parameters
#### S3
Do `aws sso login --profile dev` to get the SSO access to the bucket to clean, get the SSO_ACCOUNT_ID for your profile (`cat ~/.aws/config` may help).
- `SSO_ACCOUNT_ID`: Credentials id to use for accessing S3 buckets
- `REGION`: A region where the bucket is located at.
- `BUCKET`: Bucket name
#### Console API
_This section is only relevant if using a command that requires access to Neon's internal control plane_
- `CLOUD_ADMIN_API_URL`: The URL base to use for checking tenant/timeline for existence via the Cloud API. e.g. `https://<admin host>/admin`
- `CLOUD_ADMIN_API_TOKEN`: The token to provide when querying the admin API. Get one on the corresponding console page, e.g. `https://<admin host>/app/settings/api-keys`
### Commands
#### `tidy`
Iterate over S3 buckets for storage nodes, checking their contents and removing the data not present in the console. Node S3 data that's not removed is then further checked for discrepancies and, sometimes, validated.
Unless the global `--delete` argument is provided, this command only dry-runs and logs
what it would have deleted.
```
tidy --node-kind=<safekeeper|pageserver> [--depth=<tenant|timeline>] [--skip-validation]
```
- `--node-kind`: whether to inspect safekeeper or pageserver bucket prefix
- `--depth`: whether to only search for deletable tenants, or also search for
deletable timelines within active tenants. Default: `tenant`
- `--skip-validation`: skip additional post-deletion checks. Default: `false`
For a selected S3 path, the tool lists the S3 bucket given for either tenants or both tenants and timelines — for every found entry, console API is queried: any deleted or missing in the API entity is scheduled for deletion from S3.
If validation is enabled, only the non-deleted tenants' ones are checked.
For pageserver, timelines' index_part.json on S3 is also checked for various discrepancies: no files are removed, even if there are "extra" S3 files not present in index_part.json: due to the way pageserver updates the remote storage, it's better to do such removals manually, stopping the corresponding tenant first.
Command examples:
`env SSO_ACCOUNT_ID=369495373322 REGION=eu-west-1 BUCKET=neon-dev-storage-eu-west-1 CLOUD_ADMIN_API_TOKEN=${NEON_CLOUD_ADMIN_API_STAGING_KEY} CLOUD_ADMIN_API_URL=[url] cargo run --release -- tidy --node-kind=safekeeper`
`env SSO_ACCOUNT_ID=369495373322 REGION=us-east-2 BUCKET=neon-staging-storage-us-east-2 CLOUD_ADMIN_API_TOKEN=${NEON_CLOUD_ADMIN_API_STAGING_KEY} CLOUD_ADMIN_API_URL=[url] cargo run --release -- tidy --node-kind=pageserver --depth=timeline`
When dry run stats look satisfying, use `-- --delete` before the `tidy` command to
disable dry run and run the binary with deletion enabled.
See these lines (and lines around) in the logs for the final stats:
- `Finished listing the bucket for tenants`
- `Finished active tenant and timeline validation`
- `Total tenant deletion stats`
- `Total timeline deletion stats`
## Current implementation details
- The tool does not have any peristent state currently: instead, it creates very verbose logs, with every S3 delete request logged, every tenant/timeline id check, etc.
Worse, any panic or early errored tasks might force the tool to exit without printing the final summary — all affected ids will still be in the logs though. The tool has retries inside it, so it's error-resistant up to some extent, and recent runs showed no traces of errors/panics.
- Instead of checking non-deleted tenants' timelines instantly, the tool attempts to create separate tasks (futures) for that,
complicating the logic and slowing down the process, this should be fixed and done in one "task".
- The tool does uses only publicly available remote resources (S3, console) and does not access pageserver/safekeeper nodes themselves.
Yet, its S3 set up should be prepared for running on any pageserver/safekeeper node, using node's S3 credentials, so the node API access logic could be implemented relatively simply on top.
## Cleanup procedure:
### Pageserver preparations
If S3 state is altered first manually, pageserver in-memory state will contain wrong data about S3 state, and tenants/timelines may get recreated on S3 (due to any layer upload due to compaction, pageserver restart, etc.). So before proceeding, for tenants/timelines which are already deleted in the console, we must remove these from pageservers.
First, we need to group pageservers by buckets, `https://<admin host>/admin/pageservers`` can be used for all env nodes, then `cat /storage/pageserver/data/pageserver.toml` on every node will show the bucket names and regions needed.
Per bucket, for every pageserver id related, find deleted tenants:
`curl -X POST "https://<admin_host>/admin/check_pageserver/{id}" -H "Accept: application/json" -H "Authorization: Bearer ${NEON_CLOUD_ADMIN_API_STAGING_KEY}" | jq`
use `?check_timelines=true` to find deleted timelines, but the check runs a separate query on every alive tenant, so that could be long and time out for big pageservers.
Note that some tenants/timelines could be marked as deleted in console, but console might continue querying the node later to fully remove the tenant/timeline: wait for some time before ensuring that the "extra" tenant/timeline is not going away by itself.
When all IDs are collected, manually go to every pageserver and detach/delete the tenant/timeline.
In future, the cleanup tool may access pageservers directly, but now it's only console and S3 it has access to.

438
s3_scrubber/src/checks.rs Normal file
View File

@@ -0,0 +1,438 @@
use std::collections::{hash_map, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aws_sdk_s3::Client;
use tokio::io::AsyncReadExt;
use tokio::task::JoinSet;
use tracing::{error, info, info_span, warn, Instrument};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
use crate::delete_batch_producer::DeleteProducerStats;
use crate::{list_objects_with_retries, RootTarget, MAX_RETRIES};
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use utils::id::TenantTimelineId;
pub async fn validate_pageserver_active_tenant_and_timelines(
s3_client: Arc<Client>,
s3_root: RootTarget,
admin_client: Arc<CloudAdminApiClient>,
batch_producer_stats: DeleteProducerStats,
) -> anyhow::Result<BranchCheckStats> {
let Some(timeline_stats) = batch_producer_stats.timeline_stats else {
info!("No tenant-only checks, exiting");
return Ok(BranchCheckStats::default());
};
let s3_active_projects = batch_producer_stats
.tenant_stats
.active_entries
.into_iter()
.map(|project| (project.id.clone(), project))
.collect::<HashMap<_, _>>();
info!("Validating {} active tenants", s3_active_projects.len());
let mut s3_active_branches_per_project = HashMap::<ProjectId, Vec<BranchData>>::new();
let mut s3_blob_data = HashMap::<TenantTimelineId, S3TimelineBlobData>::new();
for active_branch in timeline_stats.active_entries {
let active_project_id = active_branch.project_id.clone();
let active_branch_id = active_branch.id.clone();
let active_timeline_id = active_branch.timeline_id;
s3_active_branches_per_project
.entry(active_project_id.clone())
.or_default()
.push(active_branch);
let Some(active_project) = s3_active_projects.get(&active_project_id) else {
error!("Branch {:?} for project {:?} has no such project in the active projects", active_branch_id, active_project_id);
continue;
};
let id = TenantTimelineId::new(active_project.tenant, active_timeline_id);
s3_blob_data.insert(
id,
list_timeline_blobs(&s3_client, id, &s3_root)
.await
.with_context(|| format!("List timeline {id} blobs"))?,
);
}
let mut branch_checks = JoinSet::new();
for (_, s3_active_project) in s3_active_projects {
let project_id = &s3_active_project.id;
let tenant_id = s3_active_project.tenant;
let mut console_active_branches =
branches_for_project_with_retries(&admin_client, project_id)
.await
.with_context(|| {
format!("Client API branches for project {project_id:?} retrieval")
})?
.into_iter()
.map(|branch| (branch.id.clone(), branch))
.collect::<HashMap<_, _>>();
let active_branches = s3_active_branches_per_project
.remove(project_id)
.unwrap_or_default();
info!(
"Spawning tasks for {} tenant {} active timelines",
active_branches.len(),
tenant_id
);
for s3_active_branch in active_branches {
let console_branch = console_active_branches.remove(&s3_active_branch.id);
let timeline_id = s3_active_branch.timeline_id;
let id = TenantTimelineId::new(tenant_id, timeline_id);
let s3_data = s3_blob_data.remove(&id);
let s3_root = s3_root.clone();
branch_checks.spawn(
async move {
let check_errors = branch_cleanup_and_check_errors(
id,
&s3_root,
&s3_active_branch,
console_branch,
s3_data,
)
.await;
(id, check_errors)
}
.instrument(info_span!("check_timeline", id = %id)),
);
}
}
let mut total_stats = BranchCheckStats::default();
while let Some((id, branch_check_errors)) = branch_checks
.join_next()
.await
.transpose()
.context("branch check task join")?
{
total_stats.add(id, branch_check_errors);
}
Ok(total_stats)
}
async fn branches_for_project_with_retries(
admin_client: &CloudAdminApiClient,
project_id: &ProjectId,
) -> anyhow::Result<Vec<BranchData>> {
for _ in 0..MAX_RETRIES {
match admin_client.branches_for_project(project_id, false).await {
Ok(branches) => return Ok(branches),
Err(e) => {
error!("admin list branches for project {project_id:?} query failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to list branches for project {project_id:?} {MAX_RETRIES} times")
}
#[derive(Debug, Default)]
pub struct BranchCheckStats {
pub timelines_with_errors: HashMap<TenantTimelineId, Vec<String>>,
pub normal_timelines: HashSet<TenantTimelineId>,
}
impl BranchCheckStats {
pub fn add(&mut self, id: TenantTimelineId, check_errors: Vec<String>) {
if check_errors.is_empty() {
if !self.normal_timelines.insert(id) {
panic!("Checking branch with timeline {id} more than once")
}
} else {
match self.timelines_with_errors.entry(id) {
hash_map::Entry::Occupied(_) => {
panic!("Checking branch with timeline {id} more than once")
}
hash_map::Entry::Vacant(v) => {
v.insert(check_errors);
}
}
}
}
}
async fn branch_cleanup_and_check_errors(
id: TenantTimelineId,
s3_root: &RootTarget,
s3_active_branch: &BranchData,
console_branch: Option<BranchData>,
s3_data: Option<S3TimelineBlobData>,
) -> Vec<String> {
info!(
"Checking timeline for branch branch {:?}/{:?}",
s3_active_branch.project_id, s3_active_branch.id
);
let mut branch_check_errors = Vec::new();
match console_branch {
Some(console_active_branch) => {
if console_active_branch.deleted {
branch_check_errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether if it got removed during the check",
s3_active_branch.id, s3_active_branch.project_id))
}
},
None => branch_check_errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether if it got removed during the check",
s3_active_branch.id, s3_active_branch.project_id))
}
let mut keys_to_remove = Vec::new();
match s3_data {
Some(s3_data) => {
keys_to_remove.extend(s3_data.keys_to_remove);
match s3_data.blob_data {
BlobDataParseResult::Parsed {
index_part,
mut s3_layers,
} => {
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
branch_check_errors.push(format!(
"index_part.json version: {}",
index_part.get_version()
))
}
if index_part.metadata.disk_consistent_lsn()
!= index_part.get_disk_consistent_lsn()
{
branch_check_errors.push(format!(
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
index_part.metadata.disk_consistent_lsn(),
index_part.get_disk_consistent_lsn(),
))
}
if index_part.layer_metadata.is_empty() {
// not an error, can happen for branches with zero writes, but notice that
info!("index_part.json has no layers");
}
for (layer, metadata) in index_part.layer_metadata {
if metadata.file_size == 0 {
branch_check_errors.push(format!(
"index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
))
}
if !s3_layers.remove(&layer) {
branch_check_errors.push(format!(
"index_part.json contains a layer {} that is not present in S3",
layer.file_name(),
))
}
}
if !s3_layers.is_empty() {
branch_check_errors.push(format!(
"index_part.json does not contain layers from S3: {:?}",
s3_layers
.iter()
.map(|layer_name| layer_name.file_name())
.collect::<Vec<_>>(),
));
keys_to_remove.extend(s3_layers.iter().map(|layer_name| {
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
let delimiter = s3_root.delimiter();
if !key.ends_with(delimiter) {
key.push_str(delimiter);
}
key.push_str(&layer_name.file_name());
key
}));
}
}
BlobDataParseResult::Incorrect(parse_errors) => branch_check_errors.extend(
parse_errors
.into_iter()
.map(|error| format!("parse error: {error}")),
),
}
}
None => branch_check_errors.push("Timeline has no data on S3 at all".to_string()),
}
if branch_check_errors.is_empty() {
info!("No check errors found");
} else {
warn!("Found check errors: {branch_check_errors:?}");
}
if !keys_to_remove.is_empty() {
error!("The following keys should be removed from S3: {keys_to_remove:?}")
}
branch_check_errors
}
#[derive(Debug)]
struct S3TimelineBlobData {
blob_data: BlobDataParseResult,
keys_to_remove: Vec<String>,
}
#[derive(Debug)]
enum BlobDataParseResult {
Parsed {
index_part: IndexPart,
s3_layers: HashSet<LayerFileName>,
},
Incorrect(Vec<String>),
}
async fn list_timeline_blobs(
s3_client: &Client,
id: TenantTimelineId,
s3_root: &RootTarget,
) -> anyhow::Result<S3TimelineBlobData> {
let mut s3_layers = HashSet::new();
let mut index_part_object = None;
let timeline_dir_target = s3_root.timeline_root(id);
let mut continuation_token = None;
let mut errors = Vec::new();
let mut keys_to_remove = Vec::new();
loop {
let fetch_response =
list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone())
.await?;
let subdirectories = fetch_response.common_prefixes().unwrap_or_default();
if !subdirectories.is_empty() {
errors.push(format!(
"S3 list response should not contain any subdirectories, but got {subdirectories:?}"
));
}
for (object, key) in fetch_response
.contents()
.unwrap_or_default()
.iter()
.filter_map(|object| Some((object, object.key()?)))
{
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some("index_part.json") => index_part_object = Some(object.clone()),
Some(maybe_layer_name) => match maybe_layer_name.parse::<LayerFileName>() {
Ok(new_layer) => {
s3_layers.insert(new_layer);
}
Err(e) => {
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
keys_to_remove.push(key.to_string());
}
},
None => {
errors.push(format!("S3 list response got an object with odd key {key}"));
keys_to_remove.push(key.to_string());
}
}
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
if index_part_object.is_none() {
errors.push("S3 list response got no index_part.json file".to_string());
}
if let Some(index_part_object_key) = index_part_object.as_ref().and_then(|object| object.key())
{
let index_part_bytes = download_object_with_retries(
s3_client,
&timeline_dir_target.bucket_name,
index_part_object_key,
)
.await
.context("index_part.json download")?;
match serde_json::from_slice(&index_part_bytes) {
Ok(index_part) => {
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Parsed {
index_part,
s3_layers,
},
keys_to_remove,
})
}
Err(index_parse_error) => errors.push(format!(
"index_part.json body parsing error: {index_parse_error}"
)),
}
} else {
errors.push(format!(
"Index part object {index_part_object:?} has no key"
));
}
if errors.is_empty() {
errors.push(
"Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
);
}
Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Incorrect(errors),
keys_to_remove,
})
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,
key: &str,
) -> anyhow::Result<Vec<u8>> {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
.await
{
Ok(bytes_read) => {
info!("Downloaded {bytes_read} bytes for object object with key {key}");
return Ok(body_buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}

View File

@@ -0,0 +1,418 @@
#![allow(unused)]
use chrono::{DateTime, Utc};
use reqwest::{header, Client, Url};
use tokio::sync::Semaphore;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
#[derive(Debug)]
pub struct Error {
context: String,
kind: ErrorKind,
}
impl Error {
fn new(context: String, kind: ErrorKind) -> Self {
Self { context, kind }
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
ErrorKind::RequestSend(e) => write!(
f,
"Failed to send a request. Context: {}, error: {}",
self.context, e
),
ErrorKind::BodyRead(e) => {
write!(
f,
"Failed to read a request body. Context: {}, error: {}",
self.context, e
)
}
ErrorKind::UnexpectedState => write!(f, "Unexpected state: {}", self.context),
}
}
}
#[derive(Debug, Clone, serde::Deserialize, Hash, PartialEq, Eq)]
#[serde(transparent)]
pub struct ProjectId(pub String);
#[derive(Clone, Debug, serde::Deserialize, Hash, PartialEq, Eq)]
#[serde(transparent)]
pub struct BranchId(pub String);
impl std::error::Error for Error {}
#[derive(Debug)]
pub enum ErrorKind {
RequestSend(reqwest::Error),
BodyRead(reqwest::Error),
UnexpectedState,
}
pub struct CloudAdminApiClient {
request_limiter: Semaphore,
token: String,
base_url: Url,
http_client: Client,
}
#[derive(Debug, serde::Deserialize)]
struct AdminApiResponse<T> {
data: T,
total: Option<usize>,
}
#[derive(Debug, serde::Deserialize)]
pub struct PageserverData {
pub id: u64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub region_id: String,
pub version: i64,
pub instance_id: String,
pub port: u16,
pub http_host: String,
pub http_port: u16,
pub active: bool,
pub projects_count: usize,
pub availability_zone_id: String,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct SafekeeperData {
pub id: u64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub region_id: String,
pub version: i64,
pub instance_id: String,
pub active: bool,
pub host: String,
pub port: u16,
pub projects_count: usize,
pub availability_zone_id: String,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ProjectData {
pub id: ProjectId,
pub name: String,
pub region_id: String,
pub platform_id: String,
pub user_id: String,
pub pageserver_id: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub tenant: TenantId,
pub safekeepers: Vec<SafekeeperData>,
pub deleted: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub pg_version: u32,
pub max_project_size: u64,
pub remote_storage_size: u64,
pub resident_size: u64,
pub synthetic_storage_size: u64,
pub compute_time: u64,
pub data_transfer: u64,
pub data_storage: u64,
pub maintenance_set: Option<String>,
}
#[serde_with::serde_as]
#[derive(Debug, serde::Deserialize)]
pub struct BranchData {
pub id: BranchId,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub name: String,
pub project_id: ProjectId,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde(default)]
pub parent_id: Option<BranchId>,
#[serde(default)]
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub parent_lsn: Option<Lsn>,
pub default: bool,
pub deleted: bool,
pub logical_size: Option<u64>,
pub physical_size: Option<u64>,
pub written_size: Option<u64>,
}
impl CloudAdminApiClient {
pub fn new(token: String, base_url: Url) -> Self {
Self {
token,
base_url,
request_limiter: Semaphore::new(200),
http_client: Client::new(), // TODO timeout configs at least
}
}
pub async fn find_tenant_project(
&self,
tenant_id: TenantId,
) -> Result<Option<ProjectData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("tenant_id", tenant_id.to_string()),
("show_deleted", "true".to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"Find project for tenant".to_string(),
ErrorKind::RequestSend(e),
)
})?;
let response: AdminApiResponse<Vec<ProjectData>> = response.json().await.map_err(|e| {
Error::new(
"Find project for tenant".to_string(),
ErrorKind::BodyRead(e),
)
})?;
match response.data.len() {
0 => Ok(None),
1 => Ok(Some(
response
.data
.into_iter()
.next()
.expect("Should have exactly one element"),
)),
too_many => Err(Error::new(
format!("Find project for tenant returned {too_many} projects instead of 0 or 1"),
ErrorKind::UnexpectedState,
)),
}
}
pub async fn find_timeline_branch(
&self,
timeline_id: TimelineId,
) -> Result<Option<BranchData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/branches"))
.query(&[
("timeline_id", timeline_id.to_string()),
("show_deleted", "true".to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| {
Error::new(
"Find branch for timeline".to_string(),
ErrorKind::RequestSend(e),
)
})?;
let response: AdminApiResponse<Vec<BranchData>> = response.json().await.map_err(|e| {
Error::new(
"Find branch for timeline".to_string(),
ErrorKind::BodyRead(e),
)
})?;
match response.data.len() {
0 => Ok(None),
1 => Ok(Some(
response
.data
.into_iter()
.next()
.expect("Should have exactly one element"),
)),
too_many => Err(Error::new(
format!("Find branch for timeline returned {too_many} branches instead of 0 or 1"),
ErrorKind::UnexpectedState,
)),
}
}
pub async fn list_pageservers(&self) -> Result<Vec<PageserverData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/pageservers"))
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::RequestSend(e)))?;
let response: AdminApiResponse<Vec<PageserverData>> = response
.json()
.await
.map_err(|e| Error::new("List pageservers".to_string(), ErrorKind::BodyRead(e)))?;
Ok(response.data)
}
pub async fn list_safekeepers(&self) -> Result<Vec<SafekeeperData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/safekeepers"))
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::RequestSend(e)))?;
let response: AdminApiResponse<Vec<SafekeeperData>> = response
.json()
.await
.map_err(|e| Error::new("List safekeepers".to_string(), ErrorKind::BodyRead(e)))?;
Ok(response.data)
}
pub async fn projects_for_pageserver(
&self,
pageserver_id: u64,
show_deleted: bool,
) -> Result<Vec<ProjectData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("pageserver_id", &pageserver_id.to_string()),
("show_deleted", &show_deleted.to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
let response: AdminApiResponse<Vec<ProjectData>> = response
.json()
.await
.map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
Ok(response.data)
}
pub async fn project_for_tenant(
&self,
tenant_id: TenantId,
show_deleted: bool,
) -> Result<Option<ProjectData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/projects"))
.query(&[
("search", &tenant_id.to_string()),
("show_deleted", &show_deleted.to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
let response: AdminApiResponse<Vec<ProjectData>> = response
.json()
.await
.map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
match response.data.as_slice() {
[] => Ok(None),
[_single] => Ok(Some(response.data.into_iter().next().unwrap())),
multiple => Err(Error::new(
format!("Got more than one project for tenant {tenant_id} : {multiple:?}"),
ErrorKind::UnexpectedState,
)),
}
}
pub async fn branches_for_project(
&self,
project_id: &ProjectId,
show_deleted: bool,
) -> Result<Vec<BranchData>, Error> {
let _permit = self
.request_limiter
.acquire()
.await
.expect("Semaphore is not closed");
let response = self
.http_client
.get(self.append_url("/branches"))
.query(&[
("project_id", &project_id.0),
("show_deleted", &show_deleted.to_string()),
])
.header(header::ACCEPT, "application/json")
.bearer_auth(&self.token)
.send()
.await
.map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::RequestSend(e)))?;
let response: AdminApiResponse<Vec<BranchData>> = response
.json()
.await
.map_err(|e| Error::new("Project for tenant".to_string(), ErrorKind::BodyRead(e)))?;
Ok(response.data)
}
fn append_url(&self, subpath: &str) -> Url {
// TODO fugly, but `.join` does not work when called
(self.base_url.to_string() + subpath)
.parse()
.unwrap_or_else(|e| panic!("Could not append {subpath} to base url: {e}"))
}
}

View File

@@ -0,0 +1,354 @@
mod tenant_batch;
mod timeline_batch;
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aws_sdk_s3::Client;
use either::Either;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::Mutex;
use tokio::task::{JoinHandle, JoinSet};
use tracing::{error, info, info_span, Instrument};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
use crate::{list_objects_with_retries, RootTarget, S3Target, TraversingDepth, MAX_RETRIES};
use utils::id::{TenantId, TenantTimelineId};
/// Typical tenant to remove contains 1 layer and 1 index_part.json blobs
/// Also, there are some non-standard tenants to remove, having more layers.
/// delete_objects request allows up to 1000 keys, so be on a safe side and allow most
/// batch processing tasks to do 1 delete objects request only.
///
/// Every batch item will be additionally S3 LS'ed later, so keep the batch size
/// even lower to allow multiple concurrent tasks do the LS requests.
const BATCH_SIZE: usize = 100;
pub struct DeleteBatchProducer {
delete_tenants_sender_task: JoinHandle<anyhow::Result<ProcessedS3List<TenantId, ProjectData>>>,
delete_timelines_sender_task:
JoinHandle<anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>>>,
delete_batch_creator_task: JoinHandle<()>,
delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
}
pub struct DeleteProducerStats {
pub tenant_stats: ProcessedS3List<TenantId, ProjectData>,
pub timeline_stats: Option<ProcessedS3List<TenantTimelineId, BranchData>>,
}
impl DeleteProducerStats {
pub fn tenants_checked(&self) -> usize {
self.tenant_stats.entries_total
}
pub fn active_tenants(&self) -> usize {
self.tenant_stats.active_entries.len()
}
pub fn timelines_checked(&self) -> usize {
self.timeline_stats
.as_ref()
.map(|stats| stats.entries_total)
.unwrap_or(0)
}
}
#[derive(Debug, Default, Clone)]
pub struct DeleteBatch {
pub tenants: Vec<TenantId>,
pub timelines: Vec<TenantTimelineId>,
}
impl DeleteBatch {
pub fn merge(&mut self, other: Self) {
self.tenants.extend(other.tenants);
self.timelines.extend(other.timelines);
}
pub fn len(&self) -> usize {
self.tenants.len() + self.timelines.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl DeleteBatchProducer {
pub fn start(
admin_client: Arc<CloudAdminApiClient>,
s3_client: Arc<Client>,
s3_root_target: RootTarget,
traversing_depth: TraversingDepth,
) -> Self {
let (delete_elements_sender, mut delete_elements_receiver) =
tokio::sync::mpsc::unbounded_channel();
let delete_elements_sender = Arc::new(delete_elements_sender);
let admin_client = Arc::new(admin_client);
let (projects_to_check_sender, mut projects_to_check_receiver) =
tokio::sync::mpsc::unbounded_channel();
let delete_tenants_root_target = s3_root_target.clone();
let delete_tenants_client = Arc::clone(&s3_client);
let delete_tenants_admin_client = Arc::clone(&admin_client);
let delete_sender = Arc::clone(&delete_elements_sender);
let delete_tenants_sender_task = tokio::spawn(
async move {
tenant_batch::schedule_cleanup_deleted_tenants(
&delete_tenants_root_target,
&delete_tenants_client,
&delete_tenants_admin_client,
projects_to_check_sender,
delete_sender,
traversing_depth,
)
.await
}
.instrument(info_span!("delete_tenants_sender")),
);
let delete_timelines_sender_task = tokio::spawn(async move {
timeline_batch::schedule_cleanup_deleted_timelines(
&s3_root_target,
&s3_client,
&admin_client,
&mut projects_to_check_receiver,
delete_elements_sender,
)
.in_current_span()
.await
});
let (delete_batch_sender, delete_batch_receiver) = tokio::sync::mpsc::unbounded_channel();
let delete_batch_creator_task = tokio::spawn(
async move {
'outer: loop {
let mut delete_batch = DeleteBatch::default();
while delete_batch.len() < BATCH_SIZE {
match delete_elements_receiver.recv().await {
Some(new_task) => match new_task {
Either::Left(tenant_id) => delete_batch.tenants.push(tenant_id),
Either::Right(timeline_id) => {
delete_batch.timelines.push(timeline_id)
}
},
None => {
info!("Task finished: sender dropped");
delete_batch_sender.send(delete_batch).ok();
break 'outer;
}
}
}
if !delete_batch.is_empty() {
delete_batch_sender.send(delete_batch).ok();
}
}
}
.instrument(info_span!("delete batch creator")),
);
Self {
delete_tenants_sender_task,
delete_timelines_sender_task,
delete_batch_creator_task,
delete_batch_receiver: Arc::new(Mutex::new(delete_batch_receiver)),
}
}
pub fn subscribe(&self) -> Arc<Mutex<UnboundedReceiver<DeleteBatch>>> {
self.delete_batch_receiver.clone()
}
pub async fn join(self) -> anyhow::Result<DeleteProducerStats> {
let (delete_tenants_task_result, delete_timelines_task_result, batch_task_result) = tokio::join!(
self.delete_tenants_sender_task,
self.delete_timelines_sender_task,
self.delete_batch_creator_task,
);
let tenant_stats = match delete_tenants_task_result {
Ok(Ok(stats)) => stats,
Ok(Err(tenant_deletion_error)) => return Err(tenant_deletion_error),
Err(join_error) => {
anyhow::bail!("Failed to join the delete tenant producing task: {join_error}")
}
};
let timeline_stats = match delete_timelines_task_result {
Ok(Ok(stats)) => Some(stats),
Ok(Err(timeline_deletion_error)) => return Err(timeline_deletion_error),
Err(join_error) => {
anyhow::bail!("Failed to join the delete timeline producing task: {join_error}")
}
};
match batch_task_result {
Ok(()) => (),
Err(join_error) => anyhow::bail!("Failed to join the batch forming task: {join_error}"),
};
Ok(DeleteProducerStats {
tenant_stats,
timeline_stats,
})
}
}
pub struct ProcessedS3List<I, A> {
pub entries_total: usize,
pub entries_to_delete: Vec<I>,
pub active_entries: Vec<A>,
}
impl<I, A> Default for ProcessedS3List<I, A> {
fn default() -> Self {
Self {
entries_total: 0,
entries_to_delete: Vec::new(),
active_entries: Vec::new(),
}
}
}
impl<I, A> ProcessedS3List<I, A> {
fn merge(&mut self, other: Self) {
self.entries_total += other.entries_total;
self.entries_to_delete.extend(other.entries_to_delete);
self.active_entries.extend(other.active_entries);
}
fn change_ids<NewI>(self, transform: impl Fn(I) -> NewI) -> ProcessedS3List<NewI, A> {
ProcessedS3List {
entries_total: self.entries_total,
entries_to_delete: self.entries_to_delete.into_iter().map(transform).collect(),
active_entries: self.active_entries,
}
}
}
async fn process_s3_target_recursively<F, Fut, I, E, A>(
s3_client: &Client,
target: &S3Target,
find_active_and_deleted_entries: F,
) -> anyhow::Result<ProcessedS3List<I, A>>
where
I: FromStr<Err = E> + Send + Sync,
E: Send + Sync + std::error::Error + 'static,
F: FnOnce(Vec<I>) -> Fut + Clone,
Fut: Future<Output = anyhow::Result<ProcessedS3List<I, A>>>,
{
let mut continuation_token = None;
let mut total_entries = ProcessedS3List::default();
loop {
let fetch_response =
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()
.unwrap_or_default()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
})
.collect::<anyhow::Result<Vec<I>>>()
.context("list and parse bucket's entry ids")?;
total_entries.merge(
(find_active_and_deleted_entries.clone())(new_entry_ids)
.await
.context("filter active and deleted entry ids")?,
);
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(total_entries)
}
enum FetchResult<A> {
Found(A),
Deleted,
Absent,
}
async fn split_to_active_and_deleted_entries<I, A, F, Fut>(
new_entry_ids: Vec<I>,
find_active_entry: F,
) -> anyhow::Result<ProcessedS3List<I, A>>
where
I: std::fmt::Display + Send + Sync + 'static + Copy,
A: Send + 'static,
F: FnOnce(I) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = anyhow::Result<FetchResult<A>>> + Send,
{
let entries_total = new_entry_ids.len();
let mut check_tasks = JoinSet::new();
let mut active_entries = Vec::with_capacity(entries_total);
let mut entries_to_delete = Vec::with_capacity(entries_total);
for new_entry_id in new_entry_ids {
let check_closure = find_active_entry.clone();
check_tasks.spawn(
async move {
(
new_entry_id,
async {
for _ in 0..MAX_RETRIES {
let closure_clone = check_closure.clone();
match closure_clone(new_entry_id).await {
Ok(active_entry) => return Ok(active_entry),
Err(e) => {
error!("find active entry admin API call failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to check entry {new_entry_id} {MAX_RETRIES} times")
}
.await,
)
}
.instrument(info_span!("filter_active_entries")),
);
}
while let Some(task_result) = check_tasks.join_next().await {
let (entry_id, entry_data_fetch_result) = task_result.context("task join")?;
match entry_data_fetch_result.context("entry data fetch")? {
FetchResult::Found(active_entry) => {
info!("Entry {entry_id} is alive, cannot delete");
active_entries.push(active_entry);
}
FetchResult::Deleted => {
info!("Entry {entry_id} deleted in the admin data, can safely delete");
entries_to_delete.push(entry_id);
}
FetchResult::Absent => {
info!("Entry {entry_id} absent in the admin data, can safely delete");
entries_to_delete.push(entry_id);
}
}
}
Ok(ProcessedS3List {
entries_total,
entries_to_delete,
active_entries,
})
}

View File

@@ -0,0 +1,87 @@
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::Client;
use either::Either;
use tokio::sync::mpsc::UnboundedSender;
use tracing::info;
use crate::cloud_admin_api::{CloudAdminApiClient, ProjectData};
use crate::delete_batch_producer::FetchResult;
use crate::{RootTarget, TraversingDepth};
use utils::id::{TenantId, TenantTimelineId};
use super::ProcessedS3List;
pub async fn schedule_cleanup_deleted_tenants(
s3_root_target: &RootTarget,
s3_client: &Arc<Client>,
admin_client: &Arc<CloudAdminApiClient>,
projects_to_check_sender: UnboundedSender<ProjectData>,
delete_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
traversing_depth: TraversingDepth,
) -> anyhow::Result<ProcessedS3List<TenantId, ProjectData>> {
info!(
"Starting to list the bucket from root {}",
s3_root_target.bucket_name()
);
s3_client
.head_bucket()
.bucket(s3_root_target.bucket_name())
.send()
.await
.with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
let check_client = Arc::clone(admin_client);
let tenant_stats = super::process_s3_target_recursively(
s3_client,
s3_root_target.tenants_root(),
|s3_tenants| async move {
let another_client = Arc::clone(&check_client);
super::split_to_active_and_deleted_entries(s3_tenants, move |tenant_id| async move {
let project_data = another_client
.find_tenant_project(tenant_id)
.await
.with_context(|| format!("Tenant {tenant_id} project admin check"))?;
Ok(if let Some(console_project) = project_data {
if console_project.deleted {
delete_sender.send(Either::Left(tenant_id)).ok();
FetchResult::Deleted
} else {
if traversing_depth == TraversingDepth::Timeline {
projects_to_check_sender.send(console_project.clone()).ok();
}
FetchResult::Found(console_project)
}
} else {
delete_sender.send(Either::Left(tenant_id)).ok();
FetchResult::Absent
})
})
.await
},
)
.await
.context("tenant batch processing")?;
info!(
"Among {} tenants, found {} tenants to delete and {} active ones",
tenant_stats.entries_total,
tenant_stats.entries_to_delete.len(),
tenant_stats.active_entries.len(),
);
let tenant_stats = match traversing_depth {
TraversingDepth::Tenant => {
info!("Finished listing the bucket for tenants only");
tenant_stats
}
TraversingDepth::Timeline => {
info!("Finished listing the bucket for tenants and sent {} active tenants to check for timelines", tenant_stats.active_entries.len());
tenant_stats
}
};
Ok(tenant_stats)
}

View File

@@ -0,0 +1,102 @@
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::Client;
use either::Either;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{info, info_span, Instrument};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
use crate::delete_batch_producer::{FetchResult, ProcessedS3List};
use crate::RootTarget;
use utils::id::{TenantId, TenantTimelineId};
pub async fn schedule_cleanup_deleted_timelines(
s3_root_target: &RootTarget,
s3_client: &Arc<Client>,
admin_client: &Arc<CloudAdminApiClient>,
projects_to_check_receiver: &mut UnboundedReceiver<ProjectData>,
delete_elements_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
) -> anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>> {
info!(
"Starting to list the bucket from root {}",
s3_root_target.bucket_name()
);
s3_client
.head_bucket()
.bucket(s3_root_target.bucket_name())
.send()
.await
.with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
let mut timeline_stats = ProcessedS3List::default();
while let Some(project_to_check) = projects_to_check_receiver.recv().await {
let check_client = Arc::clone(admin_client);
let check_s3_client = Arc::clone(s3_client);
let check_delete_sender = Arc::clone(&delete_elements_sender);
let check_root = s3_root_target.clone();
let new_stats = async move {
let tenant_id_to_check = project_to_check.tenant;
let check_target = check_root.timelines_root(tenant_id_to_check);
let stats = super::process_s3_target_recursively(
&check_s3_client,
&check_target,
|s3_timelines| async move {
let another_client = check_client.clone();
super::split_to_active_and_deleted_entries(
s3_timelines,
move |timeline_id| async move {
let console_branch = another_client
.find_timeline_branch(timeline_id)
.await
.map_err(|e| {
anyhow::anyhow!(
"Timeline {timeline_id} branch admin check: {e}"
)
})?;
let id = TenantTimelineId::new(tenant_id_to_check, timeline_id);
Ok(match console_branch {
Some(console_branch) => {
if console_branch.deleted {
check_delete_sender.send(Either::Right(id)).ok();
FetchResult::Deleted
} else {
FetchResult::Found(console_branch)
}
}
None => {
check_delete_sender.send(Either::Right(id)).ok();
FetchResult::Absent
}
})
},
)
.await
},
)
.await
.with_context(|| format!("tenant {tenant_id_to_check} timeline batch processing"))?
.change_ids(|timeline_id| TenantTimelineId::new(tenant_id_to_check, timeline_id));
Ok::<_, anyhow::Error>(stats)
}
.instrument(info_span!("delete_timelines_sender", tenant = %project_to_check.tenant))
.await?;
timeline_stats.merge(new_stats);
}
info!(
"Among {} timelines, found {} timelines to delete and {} active ones",
timeline_stats.entries_total,
timeline_stats.entries_to_delete.len(),
timeline_stats.active_entries.len(),
);
Ok(timeline_stats)
}

204
s3_scrubber/src/lib.rs Normal file
View File

@@ -0,0 +1,204 @@
pub mod checks;
pub mod cloud_admin_api;
pub mod delete_batch_producer;
mod s3_deletion;
use std::env;
use std::fmt::Display;
use std::time::Duration;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::sso::SsoCredentialsProvider;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::{Client, Config};
pub use s3_deletion::S3Deleter;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use utils::id::{TenantId, TenantTimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
#[derive(Debug, Clone)]
pub struct S3Target {
pub bucket_name: String,
pub prefix_in_bucket: String,
pub delimiter: String,
}
#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum TraversingDepth {
Tenant,
Timeline,
}
impl Display for TraversingDepth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Self::Tenant => "tenant",
Self::Timeline => "timeline",
})
}
}
impl S3Target {
pub fn with_sub_segment(&self, new_segment: &str) -> Self {
let mut new_self = self.clone();
let _ = new_self.prefix_in_bucket.pop();
new_self.prefix_in_bucket =
[&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
new_self
}
}
#[derive(Clone)]
pub enum RootTarget {
Pageserver(S3Target),
Safekeeper(S3Target),
}
impl RootTarget {
pub fn tenants_root(&self) -> &S3Target {
match self {
Self::Pageserver(root) => root,
Self::Safekeeper(root) => root,
}
}
pub fn tenant_root(&self, tenant_id: TenantId) -> S3Target {
self.tenants_root().with_sub_segment(&tenant_id.to_string())
}
pub fn timelines_root(&self, tenant_id: TenantId) -> S3Target {
match self {
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
Self::Safekeeper(_) => self.tenant_root(tenant_id),
}
}
pub fn timeline_root(&self, id: TenantTimelineId) -> S3Target {
self.timelines_root(id.tenant_id)
.with_sub_segment(&id.timeline_id.to_string())
}
pub fn bucket_name(&self) -> &str {
match self {
Self::Pageserver(root) => &root.bucket_name,
Self::Safekeeper(root) => &root.bucket_name,
}
}
pub fn delimiter(&self) -> &str {
match self {
Self::Pageserver(root) => &root.delimiter,
Self::Safekeeper(root) => &root.delimiter,
}
}
}
pub fn get_cloud_admin_api_token_or_exit() -> String {
match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
Ok(token) => token,
Err(env::VarError::NotPresent) => {
error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
std::process::exit(1);
}
Err(env::VarError::NotUnicode(not_unicode_string)) => {
error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
std::process::exit(1);
}
}
}
pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> WorkerGuard {
let file_name = if dry_run {
format!(
"{}_{}_{}__dry.log",
binary_name,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
} else {
format!(
"{}_{}_{}.log",
binary_name,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
};
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
let file_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
let stdout_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(file_logs)
.with(stdout_logs)
.init();
guard
}
pub fn init_s3_client(account_id: String, bucket_region: Region) -> Client {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try("env", EnvironmentVariableCredentialsProvider::new())
// uses sso
.or_else(
"sso",
SsoCredentialsProvider::builder()
.account_id(account_id)
.role_name("PowerUserAccess")
.start_url("https://neondb.awsapps.com/start")
.region(Region::from_static("eu-central-1"))
.build(),
)
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
let config = Config::builder()
.region(bucket_region)
.credentials_provider(credentials_provider)
.build();
Client::from_conf(config)
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for _ in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
.prefix(&s3_target.prefix_in_bucket)
.delimiter(&s3_target.delimiter)
.set_continuation_token(continuation_token.clone())
.send()
.await
{
Ok(response) => return Ok(response),
Err(e) => {
error!("list_objects_v2 query failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
}

268
s3_scrubber/src/main.rs Normal file
View File

@@ -0,0 +1,268 @@
use std::collections::HashMap;
use std::env;
use std::fmt::Display;
use std::num::NonZeroUsize;
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::config::Region;
use reqwest::Url;
use s3_scrubber::cloud_admin_api::CloudAdminApiClient;
use s3_scrubber::delete_batch_producer::DeleteBatchProducer;
use s3_scrubber::{
checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, RootTarget, S3Deleter,
S3Target, TraversingDepth,
};
use tracing::{info, info_span, warn};
use clap::{Parser, Subcommand, ValueEnum};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(arg_required_else_help(true))]
struct Cli {
#[command(subcommand)]
command: Command,
#[arg(short, long, default_value_t = false)]
delete: bool,
}
#[derive(ValueEnum, Clone, Copy, Eq, PartialEq)]
enum NodeKind {
Safekeeper,
Pageserver,
}
impl NodeKind {
fn as_str(&self) -> &'static str {
match self {
Self::Safekeeper => "safekeeper",
Self::Pageserver => "pageserver",
}
}
}
impl Display for NodeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Subcommand)]
enum Command {
Tidy {
#[arg(short, long)]
node_kind: NodeKind,
#[arg(short, long, default_value_t=TraversingDepth::Tenant)]
depth: TraversingDepth,
#[arg(short, long, default_value_t = false)]
skip_validation: bool,
},
}
struct BucketConfig {
region: String,
bucket: String,
sso_account_id: String,
}
impl Display for BucketConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}/{}", self.sso_account_id, self.region, self.bucket)
}
}
impl BucketConfig {
fn from_env() -> anyhow::Result<Self> {
let sso_account_id =
env::var("SSO_ACCOUNT_ID").context("'SSO_ACCOUNT_ID' param retrieval")?;
let region = env::var("REGION").context("'REGION' param retrieval")?;
let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
Ok(Self {
region,
bucket,
sso_account_id,
})
}
}
struct ConsoleConfig {
admin_api_url: Url,
}
impl ConsoleConfig {
fn from_env() -> anyhow::Result<Self> {
let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
.context("'CLOUD_ADMIN_API_URL' param retrieval")?
.parse()
.context("'CLOUD_ADMIN_API_URL' param parsing")?;
Ok(Self { admin_api_url })
}
}
async fn tidy(
cli: &Cli,
bucket_config: BucketConfig,
console_config: ConsoleConfig,
node_kind: NodeKind,
depth: TraversingDepth,
skip_validation: bool,
) -> anyhow::Result<()> {
let binary_name = env::args()
.next()
.context("binary name in not the first argument")?;
let dry_run = !cli.delete;
let _guard = init_logging(&binary_name, dry_run, node_kind.as_str());
let _main_span = info_span!("tidy", binary = %binary_name, %dry_run).entered();
if dry_run {
info!("Dry run, not removing items for real");
} else {
warn!("Dry run disabled, removing bucket items for real");
}
info!("skip_validation={skip_validation}");
info!("Starting extra S3 removal in {bucket_config} for node kind '{node_kind}', traversing depth: {depth:?}");
info!("Starting extra tenant S3 removal in {bucket_config} for node kind '{node_kind}'");
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(
get_cloud_admin_api_token_or_exit(),
console_config.admin_api_url,
));
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["pageserver", "v1", "tenants", ""].join(&delimiter),
delimiter,
}),
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: ["safekeeper", "v1", "wal", ""].join(&delimiter),
delimiter,
}),
};
let delete_batch_producer = DeleteBatchProducer::start(
Arc::clone(&cloud_admin_api_client),
Arc::clone(&s3_client),
s3_root.clone(),
depth,
);
let s3_deleter = S3Deleter::new(
dry_run,
NonZeroUsize::new(15).unwrap(),
Arc::clone(&s3_client),
delete_batch_producer.subscribe(),
s3_root.clone(),
);
let (deleter_task_result, batch_producer_task_result) =
tokio::join!(s3_deleter.remove_all(), delete_batch_producer.join());
let deletion_stats = deleter_task_result.context("s3 deletion")?;
info!(
"Deleted {} tenants ({} keys) and {} timelines ({} keys) total. Dry run: {}",
deletion_stats.deleted_tenant_keys.len(),
deletion_stats.deleted_tenant_keys.values().sum::<usize>(),
deletion_stats.deleted_timeline_keys.len(),
deletion_stats.deleted_timeline_keys.values().sum::<usize>(),
dry_run,
);
info!(
"Total tenant deletion stats: {:?}",
deletion_stats
.deleted_tenant_keys
.into_iter()
.map(|(id, key)| (id.to_string(), key))
.collect::<HashMap<_, _>>()
);
info!(
"Total timeline deletion stats: {:?}",
deletion_stats
.deleted_timeline_keys
.into_iter()
.map(|(id, key)| (id.to_string(), key))
.collect::<HashMap<_, _>>()
);
let batch_producer_stats = batch_producer_task_result.context("delete batch producer join")?;
info!(
"Total bucket tenants listed: {}; for {} active tenants, timelines checked: {}",
batch_producer_stats.tenants_checked(),
batch_producer_stats.active_tenants(),
batch_producer_stats.timelines_checked()
);
if node_kind == NodeKind::Pageserver {
info!("node_kind != pageserver, finish without performing validation step");
return Ok(());
}
if skip_validation {
info!("--skip-validation is set, exiting");
return Ok(());
}
info!("validating active tenants and timelines for pageserver S3 data");
// TODO kb real stats for validation + better stats for every place: add and print `min`, `max`, `mean` values at least
let validation_stats = checks::validate_pageserver_active_tenant_and_timelines(
s3_client,
s3_root,
cloud_admin_api_client,
batch_producer_stats,
)
.await
.context("active tenant and timeline validation")?;
info!("Finished active tenant and timeline validation, correct timelines: {}, timeline validation errors: {}",
validation_stats.normal_timelines.len(), validation_stats.timelines_with_errors.len());
if !validation_stats.timelines_with_errors.is_empty() {
warn!(
"Validation errors: {:#?}",
validation_stats
.timelines_with_errors
.into_iter()
.map(|(id, errors)| (id.to_string(), format!("{errors:?}")))
.collect::<HashMap<_, _>>()
);
}
info!("Done");
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let bucket_config = BucketConfig::from_env()?;
match &cli.command {
Command::Tidy {
node_kind,
depth,
skip_validation,
} => {
let console_config = ConsoleConfig::from_env()?;
tidy(
&cli,
bucket_config,
console_config,
*node_kind,
*depth,
*skip_validation,
)
.await
}
}
}

View File

@@ -0,0 +1,434 @@
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::Client;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::{debug, error, info, info_span, Instrument};
use crate::delete_batch_producer::DeleteBatch;
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId, MAX_RETRIES};
use utils::id::TenantTimelineId;
pub struct S3Deleter {
dry_run: bool,
concurrent_tasks_count: NonZeroUsize,
delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
s3_client: Arc<Client>,
s3_target: RootTarget,
}
impl S3Deleter {
pub fn new(
dry_run: bool,
concurrent_tasks_count: NonZeroUsize,
s3_client: Arc<Client>,
delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
s3_target: RootTarget,
) -> Self {
Self {
dry_run,
concurrent_tasks_count,
delete_batch_receiver,
s3_client,
s3_target,
}
}
pub async fn remove_all(self) -> anyhow::Result<DeletionStats> {
let mut deletion_tasks = JoinSet::new();
for id in 0..self.concurrent_tasks_count.get() {
let closure_client = Arc::clone(&self.s3_client);
let closure_s3_target = self.s3_target.clone();
let closure_batch_receiver = Arc::clone(&self.delete_batch_receiver);
let dry_run = self.dry_run;
deletion_tasks.spawn(
async move {
info!("Task started");
(
id,
async move {
let mut task_stats = DeletionStats::default();
loop {
let mut guard = closure_batch_receiver.lock().await;
let receiver_result = guard.try_recv();
drop(guard);
match receiver_result {
Ok(batch) => {
let stats = delete_batch(
&closure_client,
&closure_s3_target,
batch,
dry_run,
)
.await
.context("batch deletion")?;
debug!(
"Batch processed, number of objects deleted per tenant in the batch is: {}, per timeline — {}",
stats.deleted_tenant_keys.len(),
stats.deleted_timeline_keys.len(),
);
task_stats.merge(stats);
}
Err(TryRecvError::Empty) => {
debug!("No tasks yet, waiting");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Err(TryRecvError::Disconnected) => {
info!("Task finished: sender dropped");
return Ok(task_stats);
}
}
}
}
.in_current_span()
.await,
)
}
.instrument(info_span!("deletion_task", %id)),
);
}
let mut total_stats = DeletionStats::default();
while let Some(task_result) = deletion_tasks.join_next().await {
match task_result {
Ok((id, Ok(task_stats))) => {
info!("Task {id} completed");
total_stats.merge(task_stats);
}
Ok((id, Err(e))) => {
error!("Task {id} failed: {e:#}");
return Err(e);
}
Err(join_error) => anyhow::bail!("Failed to join on a task: {join_error:?}"),
}
}
Ok(total_stats)
}
}
/// S3 delete_objects allows up to 1000 keys to be passed in a single request.
/// Yet if you pass too many key requests, apparently S3 could return with OK and
/// actually delete nothing, so keep the number lower.
const MAX_ITEMS_TO_DELETE: usize = 200;
#[derive(Debug, Default)]
pub struct DeletionStats {
pub deleted_tenant_keys: BTreeMap<TenantId, usize>,
pub deleted_timeline_keys: BTreeMap<TenantTimelineId, usize>,
}
impl DeletionStats {
fn merge(&mut self, other: Self) {
self.deleted_tenant_keys.extend(other.deleted_tenant_keys);
self.deleted_timeline_keys
.extend(other.deleted_timeline_keys);
}
}
async fn delete_batch(
s3_client: &Client,
s3_target: &RootTarget,
batch: DeleteBatch,
dry_run: bool,
) -> anyhow::Result<DeletionStats> {
let (deleted_tenant_keys, deleted_timeline_keys) = tokio::join!(
delete_tenants_batch(batch.tenants, s3_target, s3_client, dry_run),
delete_timelines_batch(batch.timelines, s3_target, s3_client, dry_run),
);
Ok(DeletionStats {
deleted_tenant_keys: deleted_tenant_keys.context("tenant batch deletion")?,
deleted_timeline_keys: deleted_timeline_keys.context("timeline batch deletion")?,
})
}
async fn delete_tenants_batch(
batched_tenants: Vec<TenantId>,
s3_target: &RootTarget,
s3_client: &Client,
dry_run: bool,
) -> Result<BTreeMap<TenantId, usize>, anyhow::Error> {
info!("Deleting tenants batch of size {}", batched_tenants.len());
info!("Tenant ids to remove: {batched_tenants:?}");
let deleted_keys = delete_elements(
&batched_tenants,
s3_target,
s3_client,
dry_run,
|root_target, tenant_to_delete| root_target.tenant_root(tenant_to_delete),
)
.await?;
if !dry_run {
let mut last_err = None;
for _ in 0..MAX_RETRIES {
match ensure_tenant_batch_deleted(s3_client, s3_target, &batched_tenants).await {
Ok(()) => {
last_err = None;
break;
}
Err(e) => {
error!("Failed to ensure the tenant batch is deleted: {e}");
last_err = Some(e);
}
}
}
if let Some(e) = last_err {
anyhow::bail!(
"Failed to ensure that tenant batch is deleted {MAX_RETRIES} times: {e:?}"
);
}
}
Ok(deleted_keys)
}
async fn delete_timelines_batch(
batched_timelines: Vec<TenantTimelineId>,
s3_target: &RootTarget,
s3_client: &Client,
dry_run: bool,
) -> Result<BTreeMap<TenantTimelineId, usize>, anyhow::Error> {
info!(
"Deleting timelines batch of size {}",
batched_timelines.len()
);
info!(
"Timeline ids to remove: {:?}",
batched_timelines
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
);
let deleted_keys = delete_elements(
&batched_timelines,
s3_target,
s3_client,
dry_run,
|root_target, timeline_to_delete| root_target.timeline_root(timeline_to_delete),
)
.await?;
if !dry_run {
let mut last_err = None;
for _ in 0..MAX_RETRIES {
match ensure_timeline_batch_deleted(s3_client, s3_target, &batched_timelines).await {
Ok(()) => {
last_err = None;
break;
}
Err(e) => {
error!("Failed to ensure the timelines batch is deleted: {e}");
last_err = Some(e);
}
}
}
if let Some(e) = last_err {
anyhow::bail!(
"Failed to ensure that timeline batch is deleted {MAX_RETRIES} times: {e:?}"
);
}
}
Ok(deleted_keys)
}
async fn delete_elements<I>(
batched_ids: &Vec<I>,
s3_target: &RootTarget,
s3_client: &Client,
dry_run: bool,
target_producer: impl Fn(&RootTarget, I) -> S3Target,
) -> Result<BTreeMap<I, usize>, anyhow::Error>
where
I: Ord + PartialOrd + Copy,
{
let mut deleted_keys = BTreeMap::new();
let mut object_ids_to_delete = Vec::with_capacity(MAX_ITEMS_TO_DELETE);
for &id_to_delete in batched_ids {
let mut continuation_token = None;
let mut subtargets = vec![target_producer(s3_target, id_to_delete)];
while let Some(current_target) = subtargets.pop() {
loop {
let fetch_response = list_objects_with_retries(
s3_client,
&current_target,
continuation_token.clone(),
)
.await?;
for object_id in fetch_response
.contents()
.unwrap_or_default()
.iter()
.filter_map(|object| object.key())
.map(|key| ObjectIdentifier::builder().key(key).build())
{
if object_ids_to_delete.len() >= MAX_ITEMS_TO_DELETE {
let object_ids_for_request = std::mem::replace(
&mut object_ids_to_delete,
Vec::with_capacity(MAX_ITEMS_TO_DELETE),
);
send_delete_request(
s3_client,
s3_target.bucket_name(),
object_ids_for_request,
dry_run,
)
.await
.context("object ids deletion")?;
}
object_ids_to_delete.push(object_id);
*deleted_keys.entry(id_to_delete).or_default() += 1;
}
subtargets.extend(
fetch_response
.common_prefixes()
.unwrap_or_default()
.iter()
.filter_map(|common_prefix| common_prefix.prefix())
.map(|prefix| {
let mut new_target = current_target.clone();
new_target.prefix_in_bucket = prefix.to_string();
new_target
}),
);
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
if !object_ids_to_delete.is_empty() {
info!("Removing last objects of the batch");
send_delete_request(
s3_client,
s3_target.bucket_name(),
object_ids_to_delete,
dry_run,
)
.await
.context("Last object ids deletion")?;
}
Ok(deleted_keys)
}
pub async fn send_delete_request(
s3_client: &Client,
bucket_name: &str,
ids: Vec<ObjectIdentifier>,
dry_run: bool,
) -> anyhow::Result<()> {
info!("Removing {} object ids from S3", ids.len());
info!("Object ids to remove: {ids:?}");
let delete_request = s3_client
.delete_objects()
.bucket(bucket_name)
.delete(Delete::builder().set_objects(Some(ids)).build());
if dry_run {
info!("Dry run, skipping the actual removal");
Ok(())
} else {
let original_request = delete_request.clone();
for _ in 0..MAX_RETRIES {
match delete_request
.clone()
.send()
.await
.context("delete request processing")
{
Ok(delete_response) => {
info!("Delete response: {delete_response:?}");
match delete_response.errors() {
Some(delete_errors) => {
error!("Delete request returned errors: {delete_errors:?}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
None => {
info!("Successfully removed an object batch from S3");
return Ok(());
}
}
}
Err(e) => {
error!("Failed to send a delete request: {e:#}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
error!("Failed to do deletion, request: {original_request:?}");
anyhow::bail!("Failed to run deletion request {MAX_RETRIES} times");
}
}
async fn ensure_tenant_batch_deleted(
s3_client: &Client,
s3_target: &RootTarget,
batch: &[TenantId],
) -> anyhow::Result<()> {
let mut not_deleted_tenants = Vec::with_capacity(batch.len());
for &tenant_id in batch {
let fetch_response =
list_objects_with_retries(s3_client, &s3_target.tenant_root(tenant_id), None).await?;
if fetch_response.is_truncated()
|| fetch_response.contents().is_some()
|| fetch_response.common_prefixes().is_some()
{
error!(
"Tenant {tenant_id} should be deleted, but its list response is {fetch_response:?}"
);
not_deleted_tenants.push(tenant_id);
}
}
anyhow::ensure!(
not_deleted_tenants.is_empty(),
"Failed to delete all tenants in a batch. Tenants {not_deleted_tenants:?} should be deleted."
);
Ok(())
}
async fn ensure_timeline_batch_deleted(
s3_client: &Client,
s3_target: &RootTarget,
batch: &[TenantTimelineId],
) -> anyhow::Result<()> {
let mut not_deleted_timelines = Vec::with_capacity(batch.len());
for &id in batch {
let fetch_response =
list_objects_with_retries(s3_client, &s3_target.timeline_root(id), None).await?;
if fetch_response.is_truncated()
|| fetch_response.contents().is_some()
|| fetch_response.common_prefixes().is_some()
{
error!("Timeline {id} should be deleted, but its list response is {fetch_response:?}");
not_deleted_timelines.push(id);
}
}
anyhow::ensure!(
not_deleted_timelines.is_empty(),
"Failed to delete all timelines in a batch"
);
Ok(())
}

View File

@@ -14,6 +14,10 @@ publish = false
### BEGIN HAKARI SECTION
[dependencies]
anyhow = { version = "1", features = ["backtrace"] }
aws-config = { version = "0.56", default-features = false, features = ["credentials-sso", "rustls"] }
aws-runtime = { version = "0.56", default-features = false, features = ["event-stream"] }
aws-sigv4 = { version = "0.56", features = ["sign-eventstream"] }
aws-smithy-http = { version = "0.56", default-features = false, features = ["event-stream", "rt-tokio"] }
axum = { version = "0.6", features = ["ws"] }
base64 = { version = "0.21", features = ["alloc"] }
bytes = { version = "1", features = ["serde"] }
@@ -21,7 +25,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures = { version = "0.3" }
@@ -30,6 +33,7 @@ futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hex = { version = "0.4", features = ["serde"] }
hyper = { version = "0.14", features = ["full"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
@@ -51,6 +55,7 @@ serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
smallvec = { version = "1", default-features = false, features = ["write"] }
socket2 = { version = "0.4", default-features = false, features = ["all"] }
time = { version = "0.3", features = ["formatting", "macros", "parsing"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-util = { version = "0.7", features = ["codec", "io"] }
@@ -60,6 +65,7 @@ tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "t
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4"] }
[build-dependencies]
anyhow = { version = "1", features = ["backtrace"] }