Files
neon/s3_scrubber/src/lib.rs
John Spray 743933176e scrubber: add scan-metadata and hook into integration tests (#5176)
## Problem

- Scrubber's `tidy` command requires presence of a control plane
- Scrubber has no tests at all 

## Summary of changes

- Add re-usable async streams for reading metadata from a bucket
- Add a `scan-metadata` command that reads from those streams and calls
existing `checks.rs` code to validate metadata, then returns a summary
struct for the bucket. Command returns nonzero status if errors are
found.
- Add an `enable_scrub_on_exit()` function to NeonEnvBuilder so that
tests using remote storage can request to have the scrubber run after
they finish
- Enable remote storarge and scrub_on_exit in test_pageserver_restart
and test_pageserver_chaos

This is a "toe in the water" of the overall space of validating the
scrubber. Later, we should:
- Enable scrubbing at end of tests using remote storage by default
- Make the success condition stricter than "no errors": tests should
declare what tenants+timelines they expect to see in the bucket (or
sniff these from the functions tests use to create them) and we should
require that the scrubber reports on these particular tenants/timelines.

The `tidy` command is untouched in this PR, but it should be refactored
later to use similar async streaming interface instead of the current
batch-reading approach (the streams are faster with large buckets), and
to also be covered by some tests.


---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2023-09-06 11:55:24 +01:00

299 lines
8.5 KiB
Rust

pub mod checks;
pub mod cloud_admin_api;
pub mod delete_batch_producer;
pub mod metadata_stream;
mod s3_deletion;
pub mod scan_metadata;
use std::env;
use std::fmt::Display;
use std::time::Duration;
use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::sso::SsoCredentialsProvider;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::{Client, Config};
use reqwest::Url;
pub use s3_deletion::S3Deleter;
use tokio::io::AsyncReadExt;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use utils::id::{TenantId, TenantTimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
pub const CLI_NAME: &str = "s3-scrubber";
#[derive(Debug, Clone)]
pub struct S3Target {
pub bucket_name: String,
pub prefix_in_bucket: String,
pub delimiter: String,
}
#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum TraversingDepth {
Tenant,
Timeline,
}
impl Display for TraversingDepth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Self::Tenant => "tenant",
Self::Timeline => "timeline",
})
}
}
impl S3Target {
pub fn with_sub_segment(&self, new_segment: &str) -> Self {
let mut new_self = self.clone();
let _ = new_self.prefix_in_bucket.pop();
new_self.prefix_in_bucket =
[&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
new_self
}
}
#[derive(Clone)]
pub enum RootTarget {
Pageserver(S3Target),
Safekeeper(S3Target),
}
impl RootTarget {
pub fn tenants_root(&self) -> &S3Target {
match self {
Self::Pageserver(root) => root,
Self::Safekeeper(root) => root,
}
}
pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
self.tenants_root().with_sub_segment(&tenant_id.to_string())
}
pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
match self {
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
Self::Safekeeper(_) => self.tenant_root(tenant_id),
}
}
pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
self.timelines_root(&id.tenant_id)
.with_sub_segment(&id.timeline_id.to_string())
}
pub fn bucket_name(&self) -> &str {
match self {
Self::Pageserver(root) => &root.bucket_name,
Self::Safekeeper(root) => &root.bucket_name,
}
}
pub fn delimiter(&self) -> &str {
match self {
Self::Pageserver(root) => &root.delimiter,
Self::Safekeeper(root) => &root.delimiter,
}
}
}
pub struct BucketConfig {
pub region: String,
pub bucket: String,
/// Use SSO if this is set, else rely on AWS_* environment vars
pub sso_account_id: Option<String>,
}
impl Display for BucketConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{}/{}",
self.sso_account_id.as_deref().unwrap_or("<none>"),
self.region,
self.bucket
)
}
}
impl BucketConfig {
pub fn from_env() -> anyhow::Result<Self> {
let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
let region = env::var("REGION").context("'REGION' param retrieval")?;
let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
Ok(Self {
region,
bucket,
sso_account_id,
})
}
}
pub struct ConsoleConfig {
pub admin_api_url: Url,
}
impl ConsoleConfig {
pub fn from_env() -> anyhow::Result<Self> {
let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
.context("'CLOUD_ADMIN_API_URL' param retrieval")?
.parse()
.context("'CLOUD_ADMIN_API_URL' param parsing")?;
Ok(Self { admin_api_url })
}
}
pub fn get_cloud_admin_api_token_or_exit() -> String {
match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
Ok(token) => token,
Err(env::VarError::NotPresent) => {
error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable is not present");
std::process::exit(1);
}
Err(env::VarError::NotUnicode(not_unicode_string)) => {
error!("{CLOUD_ADMIN_API_TOKEN_ENV_VAR} env variable's value is not a valid unicode string: {not_unicode_string:?}");
std::process::exit(1);
}
}
}
pub fn init_logging(file_name: &str) -> WorkerGuard {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
let file_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
let stdout_logs = fmt::Layer::new()
.with_target(false)
.with_writer(std::io::stdout);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(file_logs)
.with(stdout_logs)
.init();
guard
}
pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
let chain = CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
);
// Use SSO if we were given an account ID
match account_id {
Some(sso_account) => chain.or_else(
"sso",
SsoCredentialsProvider::builder()
.account_id(sso_account)
.role_name("PowerUserAccess")
.start_url("https://neondb.awsapps.com/start")
.region(Region::from_static("eu-central-1"))
.build(),
),
None => chain,
}
.or_else(
// Finally try IMDS
"imds",
ImdsCredentialsProvider::builder().build(),
)
};
let mut builder = Config::builder()
.region(bucket_region)
.credentials_provider(credentials_provider);
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
builder = builder.endpoint_url(endpoint)
}
Client::from_conf(builder.build())
}
async fn list_objects_with_retries(
s3_client: &Client,
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for _ in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
.prefix(&s3_target.prefix_in_bucket)
.delimiter(&s3_target.delimiter)
.set_continuation_token(continuation_token.clone())
.send()
.await
{
Ok(response) => return Ok(response),
Err(e) => {
error!("list_objects_v2 query failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,
key: &str,
) -> anyhow::Result<Vec<u8>> {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
.await
{
Ok(bytes_read) => {
tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
return Ok(body_buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}