mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Add find-large-objects subcommand to scrubber (#8257)
Adds a find-large-objects subcommand to the scrubber to allow listing layer objects larger than a specific size. To be used like: ``` AWS_PROFILE=dev REGION=us-east-2 BUCKET=neon-dev-storage-us-east-2 cargo run -p storage_scrubber -- find-large-objects --min-size 250000000 --ignore-deltas ``` Part of #5431
This commit is contained in:
@@ -259,7 +259,7 @@ pub(crate) enum BlobDataParseResult {
|
||||
Incorrect(Vec<String>),
|
||||
}
|
||||
|
||||
fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
|
||||
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
|
||||
match name.rsplit_once('-') {
|
||||
// FIXME: this is gross, just use a regex?
|
||||
Some((layer_filename, gen)) if gen.len() == 8 => {
|
||||
|
||||
97
storage_scrubber/src/find_large_objects.rs
Normal file
97
storage_scrubber/src/find_large_objects.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
use futures::StreamExt;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
checks::parse_layer_object_name, init_remote, list_objects_with_retries,
|
||||
metadata_stream::stream_tenants, BucketConfig, NodeKind,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
enum LargeObjectKind {
|
||||
DeltaLayer,
|
||||
ImageLayer,
|
||||
Other,
|
||||
}
|
||||
|
||||
impl LargeObjectKind {
|
||||
fn from_key(key: &str) -> Self {
|
||||
let fname = key.split('/').last().unwrap();
|
||||
|
||||
let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
|
||||
return LargeObjectKind::Other;
|
||||
};
|
||||
|
||||
match layer_name {
|
||||
LayerName::Image(_) => LargeObjectKind::ImageLayer,
|
||||
LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct LargeObject {
|
||||
pub key: String,
|
||||
pub size: u64,
|
||||
kind: LargeObjectKind,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct LargeObjectListing {
|
||||
pub objects: Vec<LargeObject>,
|
||||
}
|
||||
|
||||
pub async fn find_large_objects(
|
||||
bucket_config: BucketConfig,
|
||||
min_size: u64,
|
||||
ignore_deltas: bool,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
let mut objects = Vec::new();
|
||||
let mut tenant_ctr = 0u64;
|
||||
let mut object_ctr = 0u64;
|
||||
while let Some(tenant_shard_id) = tenants.next().await {
|
||||
let tenant_shard_id = tenant_shard_id?;
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
object_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
tenant_ctr += 1;
|
||||
if tenant_ctr % 50 == 0 {
|
||||
tracing::info!(
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(LargeObjectListing { objects })
|
||||
}
|
||||
@@ -2,6 +2,7 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
pub mod checks;
|
||||
pub mod cloud_admin_api;
|
||||
pub mod find_large_objects;
|
||||
pub mod garbage;
|
||||
pub mod metadata_stream;
|
||||
pub mod pageserver_physical_gc;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use anyhow::bail;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use storage_scrubber::find_large_objects;
|
||||
use storage_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
|
||||
use storage_scrubber::pageserver_physical_gc::GcMode;
|
||||
use storage_scrubber::scan_pageserver_metadata::scan_metadata;
|
||||
@@ -72,6 +73,12 @@ enum Command {
|
||||
#[arg(short, long, default_value_t = GcMode::IndicesOnly)]
|
||||
mode: GcMode,
|
||||
},
|
||||
FindLargeObjects {
|
||||
#[arg(long = "min-size")]
|
||||
min_size: u64,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
ignore_deltas: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -86,6 +93,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::PurgeGarbage { .. } => "purge-garbage",
|
||||
Command::TenantSnapshot { .. } => "tenant-snapshot",
|
||||
Command::PageserverPhysicalGc { .. } => "pageserver-physical-gc",
|
||||
Command::FindLargeObjects { .. } => "find-large-objects",
|
||||
};
|
||||
let _guard = init_logging(&format!(
|
||||
"{}_{}_{}_{}.log",
|
||||
@@ -199,5 +207,15 @@ async fn main() -> anyhow::Result<()> {
|
||||
println!("{}", serde_json::to_string(&summary).unwrap());
|
||||
Ok(())
|
||||
}
|
||||
Command::FindLargeObjects {
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
} => {
|
||||
let summary =
|
||||
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
|
||||
.await?;
|
||||
println!("{}", serde_json::to_string(&summary).unwrap());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user