mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
Updates storage components to edition 2024. We like to stay on the latest edition if possible. There is no functional changes, however some code changes had to be done to accommodate the edition's breaking changes. The PR has two commits: * the first commit updates storage crates to edition 2024 and appeases `cargo clippy` by changing code. i have accidentially ran the formatter on some files that had other edits. * the second commit performs a `cargo fmt` I would recommend a closer review of the first commit and a less close review of the second one (as it just runs `cargo fmt`). part of https://github.com/neondatabase/neon/issues/10918
115 lines
3.7 KiB
Rust
115 lines
3.7 KiB
Rust
use std::pin::pin;
|
|
|
|
use futures::{StreamExt, TryStreamExt};
|
|
use pageserver::tenant::storage_layer::LayerName;
|
|
use remote_storage::ListingMode;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use crate::checks::parse_layer_object_name;
|
|
use crate::metadata_stream::stream_tenants;
|
|
use crate::{BucketConfig, NodeKind, init_remote, stream_objects_with_retries};
|
|
|
|
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
|
enum LargeObjectKind {
|
|
DeltaLayer,
|
|
ImageLayer,
|
|
Other,
|
|
}
|
|
|
|
impl LargeObjectKind {
|
|
fn from_key(key: &str) -> Self {
|
|
let fname = key.split('/').last().unwrap();
|
|
|
|
let Ok((layer_name, _generation)) = parse_layer_object_name(fname) else {
|
|
return LargeObjectKind::Other;
|
|
};
|
|
|
|
match layer_name {
|
|
LayerName::Image(_) => LargeObjectKind::ImageLayer,
|
|
LayerName::Delta(_) => LargeObjectKind::DeltaLayer,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Clone)]
|
|
pub struct LargeObject {
|
|
pub key: String,
|
|
pub size: u64,
|
|
kind: LargeObjectKind,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
pub struct LargeObjectListing {
|
|
pub objects: Vec<LargeObject>,
|
|
}
|
|
|
|
pub async fn find_large_objects(
|
|
bucket_config: BucketConfig,
|
|
min_size: u64,
|
|
ignore_deltas: bool,
|
|
concurrency: usize,
|
|
) -> anyhow::Result<LargeObjectListing> {
|
|
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
|
let tenants = pin!(stream_tenants(&remote_client, &target));
|
|
|
|
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
|
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
|
let remote_client = remote_client.clone();
|
|
async move {
|
|
let mut objects = Vec::new();
|
|
let mut total_objects_ctr = 0u64;
|
|
// We want the objects and not just common prefixes
|
|
tenant_root.delimiter.clear();
|
|
let mut objects_stream = pin!(stream_objects_with_retries(
|
|
&remote_client,
|
|
ListingMode::NoDelimiter,
|
|
&tenant_root
|
|
));
|
|
while let Some(listing) = objects_stream.next().await {
|
|
let listing = listing?;
|
|
for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
|
|
let key = obj.key.to_string();
|
|
let kind = LargeObjectKind::from_key(&key);
|
|
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
|
continue;
|
|
}
|
|
objects.push(LargeObject {
|
|
key,
|
|
size: obj.size,
|
|
kind,
|
|
})
|
|
}
|
|
total_objects_ctr += listing.keys.len() as u64;
|
|
}
|
|
|
|
Ok((tenant_shard_id, objects, total_objects_ctr))
|
|
}
|
|
});
|
|
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
|
|
|
|
let mut objects = Vec::new();
|
|
|
|
let mut tenant_ctr = 0u64;
|
|
let mut object_ctr = 0u64;
|
|
while let Some(res) = objects_stream.next().await {
|
|
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
|
|
objects.extend_from_slice(&objects_slice);
|
|
|
|
object_ctr += total_objects_ctr;
|
|
tenant_ctr += 1;
|
|
if tenant_ctr % 100 == 0 {
|
|
tracing::info!(
|
|
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
|
|
objects.len()
|
|
);
|
|
}
|
|
}
|
|
|
|
let desc_str = target.desc_str();
|
|
tracing::info!(
|
|
"Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
|
|
objects.len()
|
|
);
|
|
Ok(LargeObjectListing { objects })
|
|
}
|