mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 08:30:37 +00:00
## Problem - Scrubber's `tidy` command requires presence of a control plane - Scrubber has no tests at all ## Summary of changes - Add re-usable async streams for reading metadata from a bucket - Add a `scan-metadata` command that reads from those streams and calls existing `checks.rs` code to validate metadata, then returns a summary struct for the bucket. Command returns nonzero status if errors are found. - Add an `enable_scrub_on_exit()` function to NeonEnvBuilder so that tests using remote storage can request to have the scrubber run after they finish - Enable remote storarge and scrub_on_exit in test_pageserver_restart and test_pageserver_chaos This is a "toe in the water" of the overall space of validating the scrubber. Later, we should: - Enable scrubbing at end of tests using remote storage by default - Make the success condition stricter than "no errors": tests should declare what tenants+timelines they expect to see in the bucket (or sniff these from the functions tests use to create them) and we should require that the scrubber reports on these particular tenants/timelines. The `tidy` command is untouched in this PR, but it should be refactored later to use similar async streaming interface instead of the current batch-reading approach (the streams are faster with large buckets), and to also be covered by some tests. --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech> Co-authored-by: Alexander Bayandin <alexander@neon.tech> Co-authored-by: Christian Schwarz <christian@neon.tech> Co-authored-by: Conrad Ludgate <conrad@neon.tech>
107 lines
3.6 KiB
Rust
107 lines
3.6 KiB
Rust
use anyhow::Context;
|
|
use async_stream::{stream, try_stream};
|
|
use aws_sdk_s3::Client;
|
|
use tokio_stream::Stream;
|
|
|
|
use crate::{list_objects_with_retries, RootTarget, TenantId};
|
|
use utils::id::{TenantTimelineId, TimelineId};
|
|
|
|
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
|
|
pub fn stream_tenants<'a>(
|
|
s3_client: &'a Client,
|
|
target: &'a RootTarget,
|
|
) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
|
|
try_stream! {
|
|
let mut continuation_token = None;
|
|
loop {
|
|
let tenants_target = target.tenants_root();
|
|
let fetch_response =
|
|
list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?;
|
|
|
|
let new_entry_ids = fetch_response
|
|
.common_prefixes()
|
|
.unwrap_or_default()
|
|
.iter()
|
|
.filter_map(|prefix| prefix.prefix())
|
|
.filter_map(|prefix| -> Option<&str> {
|
|
prefix
|
|
.strip_prefix(&tenants_target.prefix_in_bucket)?
|
|
.strip_suffix('/')
|
|
}).map(|entry_id_str| {
|
|
entry_id_str
|
|
.parse()
|
|
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
|
|
});
|
|
|
|
for i in new_entry_ids {
|
|
yield i?;
|
|
}
|
|
|
|
match fetch_response.next_continuation_token {
|
|
Some(new_token) => continuation_token = Some(new_token),
|
|
None => break,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Given a TenantId, output a stream of the timelines within that tenant, discovered
|
|
/// using ListObjectsv2. The listing is done before the stream is built, so that this
|
|
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
|
pub async fn stream_tenant_timelines<'a>(
|
|
s3_client: &'a Client,
|
|
target: &'a RootTarget,
|
|
tenant: TenantId,
|
|
) -> anyhow::Result<impl Stream<Item = Result<TenantTimelineId, anyhow::Error>> + 'a> {
|
|
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
|
|
let mut continuation_token = None;
|
|
let timelines_target = target.timelines_root(&tenant);
|
|
|
|
loop {
|
|
tracing::info!("Listing in {}", tenant);
|
|
let fetch_response =
|
|
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
|
|
.await;
|
|
let fetch_response = match fetch_response {
|
|
Err(e) => {
|
|
timeline_ids.push(Err(e));
|
|
break;
|
|
}
|
|
Ok(r) => r,
|
|
};
|
|
|
|
let new_entry_ids = fetch_response
|
|
.common_prefixes()
|
|
.unwrap_or_default()
|
|
.iter()
|
|
.filter_map(|prefix| prefix.prefix())
|
|
.filter_map(|prefix| -> Option<&str> {
|
|
prefix
|
|
.strip_prefix(&timelines_target.prefix_in_bucket)?
|
|
.strip_suffix('/')
|
|
})
|
|
.map(|entry_id_str| {
|
|
entry_id_str
|
|
.parse::<TimelineId>()
|
|
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
|
|
});
|
|
|
|
for i in new_entry_ids {
|
|
timeline_ids.push(i);
|
|
}
|
|
|
|
match fetch_response.next_continuation_token {
|
|
Some(new_token) => continuation_token = Some(new_token),
|
|
None => break,
|
|
}
|
|
}
|
|
|
|
tracing::info!("Yielding for {}", tenant);
|
|
Ok(stream! {
|
|
for i in timeline_ids {
|
|
let id = i?;
|
|
yield Ok(TenantTimelineId::new(tenant, id));
|
|
}
|
|
})
|
|
}
|