scrubber: add scan-metadata and hook into integration tests (#5176)

## Problem

- Scrubber's `tidy` command requires presence of a control plane
- Scrubber has no tests at all 

## Summary of changes

- Add re-usable async streams for reading metadata from a bucket
- Add a `scan-metadata` command that reads from those streams and calls
existing `checks.rs` code to validate metadata, then returns a summary
struct for the bucket. Command returns nonzero status if errors are
found.
- Add an `enable_scrub_on_exit()` function to NeonEnvBuilder so that
tests using remote storage can request to have the scrubber run after
they finish
- Enable remote storarge and scrub_on_exit in test_pageserver_restart
and test_pageserver_chaos

This is a "toe in the water" of the overall space of validating the
scrubber. Later, we should:
- Enable scrubbing at end of tests using remote storage by default
- Make the success condition stricter than "no errors": tests should
declare what tenants+timelines they expect to see in the bucket (or
sniff these from the functions tests use to create them) and we should
require that the scrubber reports on these particular tenants/timelines.

The `tidy` command is untouched in this PR, but it should be refactored
later to use similar async streaming interface instead of the current
batch-reading approach (the streams are faster with large buckets), and
to also be covered by some tests.


---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Conrad Ludgate <conrad@neon.tech>
This commit is contained in:
John Spray
2023-09-06 11:55:24 +01:00
committed by GitHub
parent 8e25d3e79e
commit 743933176e
18 changed files with 814 additions and 238 deletions

36
Cargo.lock generated
View File

@@ -213,17 +213,6 @@ dependencies = [
"critical-section",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -1786,15 +1775,6 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.2.6"
@@ -1825,6 +1805,16 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46"
[[package]]
name = "histogram"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e673d137229619d5c2c8903b6ed5852b43636c0017ff2e66b1aafb8ccf04b80b"
dependencies = [
"serde",
"thiserror",
]
[[package]]
name = "hmac"
version = "0.12.1"
@@ -3738,7 +3728,7 @@ name = "s3_scrubber"
version = "0.1.0"
dependencies = [
"anyhow",
"atty",
"async-stream",
"aws-config",
"aws-sdk-s3",
"aws-smithy-http",
@@ -3749,7 +3739,10 @@ dependencies = [
"clap",
"crc32c",
"either",
"futures-util",
"hex",
"histogram",
"itertools",
"pageserver",
"rand",
"reqwest",
@@ -3759,6 +3752,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-rustls",
"tokio-stream",
"tracing",
"tracing-appender",
"tracing-subscriber",

View File

@@ -3,6 +3,7 @@
//! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data.
use anyhow::Result;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
@@ -142,12 +143,12 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let mut total_delta_layers = 0usize;
let mut total_image_layers = 0usize;
let mut total_excess_layers = 0usize;
for tenant in fs::read_dir(storage_path.join("tenants"))? {
for tenant in fs::read_dir(storage_path.join(TENANTS_SEGMENT_NAME))? {
let tenant = tenant?;
if !tenant.file_type()?.is_dir() {
continue;
}
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
for timeline in fs::read_dir(tenant.path().join(TIMELINES_SEGMENT_NAME))? {
let timeline = timeline?;
if !timeline.file_type()?.is_dir() {
continue;

View File

@@ -5,6 +5,7 @@ use clap::Subcommand;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
@@ -80,13 +81,13 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
match cmd {
LayerCmd::List { path } => {
for tenant in fs::read_dir(path.join("tenants"))? {
for tenant in fs::read_dir(path.join(TENANTS_SEGMENT_NAME))? {
let tenant = tenant?;
if !tenant.file_type()?.is_dir() {
continue;
}
println!("tenant {}", tenant.file_name().to_string_lossy());
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
for timeline in fs::read_dir(tenant.path().join(TIMELINES_SEGMENT_NAME))? {
let timeline = timeline?;
if !timeline.file_type()?.is_dir() {
continue;
@@ -101,9 +102,9 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
timeline,
} => {
let timeline_path = path
.join("tenants")
.join(TENANTS_SEGMENT_NAME)
.join(tenant)
.join("timelines")
.join(TIMELINES_SEGMENT_NAME)
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {

View File

@@ -32,7 +32,8 @@ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME,
TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
@@ -563,7 +564,7 @@ impl PageServerConf {
//
pub fn tenants_path(&self) -> PathBuf {
self.workdir.join("tenants")
self.workdir.join(TENANTS_SEGMENT_NAME)
}
pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf {

View File

@@ -141,6 +141,9 @@ pub use crate::tenant::metadata::save_metadata;
// re-export for use in walreceiver
pub use crate::tenant::timeline::WalReceiverInfo;
/// The "tenants" part of `tenants/<tenant>/timelines...`
pub const TENANTS_SEGMENT_NAME: &str = "tenants";
/// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";

View File

@@ -11,6 +11,7 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use crate::tenant::TENANTS_SEGMENT_NAME;
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
@@ -235,7 +236,7 @@ impl VirtualFile {
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {

View File

@@ -22,6 +22,10 @@ serde_json.workspace = true
serde_with.workspace = true
workspace_hack.workspace = true
utils.workspace = true
async-stream.workspace = true
tokio-stream.workspace = true
futures-util.workspace = true
itertools.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }
@@ -30,10 +34,8 @@ aws-config = { workspace = true, default-features = false, features = ["rustls",
pageserver = {path="../pageserver"}
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
atty = "0.2"
tracing-appender = "0.2"
tracing-appender = "0.2"
histogram = "0.7"

View File

@@ -4,13 +4,12 @@ use std::time::Duration;
use anyhow::Context;
use aws_sdk_s3::Client;
use tokio::io::AsyncReadExt;
use tokio::task::JoinSet;
use tracing::{error, info, info_span, warn, Instrument};
use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
use crate::delete_batch_producer::DeleteProducerStats;
use crate::{list_objects_with_retries, RootTarget, MAX_RETRIES};
use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES};
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use utils::id::TenantTimelineId;
@@ -92,9 +91,9 @@ pub async fn validate_pageserver_active_tenant_and_timelines(
branch_checks.spawn(
async move {
let check_errors = branch_cleanup_and_check_errors(
id,
&id,
&s3_root,
&s3_active_branch,
Some(&s3_active_branch),
console_branch,
s3_data,
)
@@ -107,13 +106,13 @@ pub async fn validate_pageserver_active_tenant_and_timelines(
}
let mut total_stats = BranchCheckStats::default();
while let Some((id, branch_check_errors)) = branch_checks
while let Some((id, analysis)) = branch_checks
.join_next()
.await
.transpose()
.context("branch check task join")?
{
total_stats.add(id, branch_check_errors);
total_stats.add(id, analysis.errors);
}
Ok(total_stats)
}
@@ -160,35 +159,59 @@ impl BranchCheckStats {
}
}
async fn branch_cleanup_and_check_errors(
id: TenantTimelineId,
pub struct TimelineAnalysis {
/// Anomalies detected
pub errors: Vec<String>,
/// Healthy-but-noteworthy, like old-versioned structures that are readable but
/// worth reporting for awareness that we must not remove that old version decoding
/// yet.
pub warnings: Vec<String>,
/// Keys not referenced in metadata: candidates for removal
pub garbage_keys: Vec<String>,
}
impl TimelineAnalysis {
fn new() -> Self {
Self {
errors: Vec::new(),
warnings: Vec::new(),
garbage_keys: Vec::new(),
}
}
}
pub async fn branch_cleanup_and_check_errors(
id: &TenantTimelineId,
s3_root: &RootTarget,
s3_active_branch: &BranchData,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<S3TimelineBlobData>,
) -> Vec<String> {
info!(
"Checking timeline for branch branch {:?}/{:?}",
s3_active_branch.project_id, s3_active_branch.id
);
let mut branch_check_errors = Vec::new();
) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new();
match console_branch {
Some(console_active_branch) => {
if console_active_branch.deleted {
branch_check_errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether if it got removed during the check",
s3_active_branch.id, s3_active_branch.project_id))
}
},
None => branch_check_errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether if it got removed during the check",
info!("Checking timeline {id}");
if let Some(s3_active_branch) = s3_active_branch {
info!(
"Checking console status for timeline for branch {:?}/{:?}",
s3_active_branch.project_id, s3_active_branch.id
);
match console_branch {
Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
s3_active_branch.id, s3_active_branch.project_id))
},
None => {
result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
s3_active_branch.id, s3_active_branch.project_id))
}
};
}
let mut keys_to_remove = Vec::new();
match s3_data {
Some(s3_data) => {
keys_to_remove.extend(s3_data.keys_to_remove);
result.garbage_keys.extend(s3_data.keys_to_remove);
match s3_data.blob_data {
BlobDataParseResult::Parsed {
@@ -196,16 +219,23 @@ async fn branch_cleanup_and_check_errors(
mut s3_layers,
} => {
if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
branch_check_errors.push(format!(
result.errors.push(format!(
"index_part.json version: {}",
index_part.get_version()
))
}
if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
result.warnings.push(format!(
"index_part.json version is not latest: {}",
index_part.get_version()
))
}
if index_part.metadata.disk_consistent_lsn()
!= index_part.get_disk_consistent_lsn()
{
branch_check_errors.push(format!(
result.errors.push(format!(
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
index_part.metadata.disk_consistent_lsn(),
index_part.get_disk_consistent_lsn(),
@@ -220,13 +250,13 @@ async fn branch_cleanup_and_check_errors(
for (layer, metadata) in index_part.layer_metadata {
if metadata.file_size == 0 {
branch_check_errors.push(format!(
result.errors.push(format!(
"index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
))
}
if !s3_layers.remove(&layer) {
branch_check_errors.push(format!(
result.errors.push(format!(
"index_part.json contains a layer {} that is not present in S3",
layer.file_name(),
))
@@ -234,55 +264,66 @@ async fn branch_cleanup_and_check_errors(
}
if !s3_layers.is_empty() {
branch_check_errors.push(format!(
result.errors.push(format!(
"index_part.json does not contain layers from S3: {:?}",
s3_layers
.iter()
.map(|layer_name| layer_name.file_name())
.collect::<Vec<_>>(),
));
keys_to_remove.extend(s3_layers.iter().map(|layer_name| {
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
let delimiter = s3_root.delimiter();
if !key.ends_with(delimiter) {
key.push_str(delimiter);
}
key.push_str(&layer_name.file_name());
key
}));
result
.garbage_keys
.extend(s3_layers.iter().map(|layer_name| {
let mut key = s3_root.timeline_root(id).prefix_in_bucket;
let delimiter = s3_root.delimiter();
if !key.ends_with(delimiter) {
key.push_str(delimiter);
}
key.push_str(&layer_name.file_name());
key
}));
}
}
BlobDataParseResult::Incorrect(parse_errors) => branch_check_errors.extend(
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
parse_errors
.into_iter()
.map(|error| format!("parse error: {error}")),
),
}
}
None => branch_check_errors.push("Timeline has no data on S3 at all".to_string()),
None => result
.errors
.push("Timeline has no data on S3 at all".to_string()),
}
if branch_check_errors.is_empty() {
if result.errors.is_empty() {
info!("No check errors found");
} else {
warn!("Found check errors: {branch_check_errors:?}");
warn!("Timeline metadata errors: {0:?}", result.errors);
}
if !keys_to_remove.is_empty() {
error!("The following keys should be removed from S3: {keys_to_remove:?}")
if !result.warnings.is_empty() {
warn!("Timeline metadata warnings: {0:?}", result.warnings);
}
branch_check_errors
if !result.garbage_keys.is_empty() {
error!(
"The following keys should be removed from S3: {0:?}",
result.garbage_keys
)
}
result
}
#[derive(Debug)]
struct S3TimelineBlobData {
blob_data: BlobDataParseResult,
keys_to_remove: Vec<String>,
pub struct S3TimelineBlobData {
pub blob_data: BlobDataParseResult,
pub keys_to_remove: Vec<String>,
}
#[derive(Debug)]
enum BlobDataParseResult {
pub enum BlobDataParseResult {
Parsed {
index_part: IndexPart,
s3_layers: HashSet<LayerFileName>,
@@ -290,7 +331,7 @@ enum BlobDataParseResult {
Incorrect(Vec<String>),
}
async fn list_timeline_blobs(
pub async fn list_timeline_blobs(
s3_client: &Client,
id: TenantTimelineId,
s3_root: &RootTarget,
@@ -298,7 +339,7 @@ async fn list_timeline_blobs(
let mut s3_layers = HashSet::new();
let mut index_part_object = None;
let timeline_dir_target = s3_root.timeline_root(id);
let timeline_dir_target = s3_root.timeline_root(&id);
let mut continuation_token = None;
let mut errors = Vec::new();
@@ -394,45 +435,3 @@ async fn list_timeline_blobs(
keys_to_remove,
})
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,
key: &str,
) -> anyhow::Result<Vec<u8>> {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
.await
{
Ok(bytes_read) => {
info!("Downloaded {bytes_read} bytes for object object with key {key}");
return Ok(body_buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}

View File

@@ -41,7 +41,7 @@ pub async fn schedule_cleanup_deleted_timelines(
let new_stats = async move {
let tenant_id_to_check = project_to_check.tenant;
let check_target = check_root.timelines_root(tenant_id_to_check);
let check_target = check_root.timelines_root(&tenant_id_to_check);
let stats = super::process_s3_target_recursively(
&check_s3_client,
&check_target,

View File

@@ -1,12 +1,15 @@
pub mod checks;
pub mod cloud_admin_api;
pub mod delete_batch_producer;
pub mod metadata_stream;
mod s3_deletion;
pub mod scan_metadata;
use std::env;
use std::fmt::Display;
use std::time::Duration;
use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
@@ -14,7 +17,9 @@ use aws_config::sso::SsoCredentialsProvider;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::{Client, Config};
use reqwest::Url;
pub use s3_deletion::S3Deleter;
use tokio::io::AsyncReadExt;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@@ -23,6 +28,8 @@ use utils::id::{TenantId, TenantTimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
pub const CLI_NAME: &str = "s3-scrubber";
#[derive(Debug, Clone)]
pub struct S3Target {
pub bucket_name: String,
@@ -69,19 +76,19 @@ impl RootTarget {
}
}
pub fn tenant_root(&self, tenant_id: TenantId) -> S3Target {
pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
self.tenants_root().with_sub_segment(&tenant_id.to_string())
}
pub fn timelines_root(&self, tenant_id: TenantId) -> S3Target {
pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
match self {
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
Self::Safekeeper(_) => self.tenant_root(tenant_id),
}
}
pub fn timeline_root(&self, id: TenantTimelineId) -> S3Target {
self.timelines_root(id.tenant_id)
pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
self.timelines_root(&id.tenant_id)
.with_sub_segment(&id.timeline_id.to_string())
}
@@ -100,6 +107,55 @@ impl RootTarget {
}
}
pub struct BucketConfig {
pub region: String,
pub bucket: String,
/// Use SSO if this is set, else rely on AWS_* environment vars
pub sso_account_id: Option<String>,
}
impl Display for BucketConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{}/{}",
self.sso_account_id.as_deref().unwrap_or("<none>"),
self.region,
self.bucket
)
}
}
impl BucketConfig {
pub fn from_env() -> anyhow::Result<Self> {
let sso_account_id = env::var("SSO_ACCOUNT_ID").ok();
let region = env::var("REGION").context("'REGION' param retrieval")?;
let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
Ok(Self {
region,
bucket,
sso_account_id,
})
}
}
pub struct ConsoleConfig {
pub admin_api_url: Url,
}
impl ConsoleConfig {
pub fn from_env() -> anyhow::Result<Self> {
let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
.context("'CLOUD_ADMIN_API_URL' param retrieval")?
.parse()
.context("'CLOUD_ADMIN_API_URL' param parsing")?;
Ok(Self { admin_api_url })
}
}
pub fn get_cloud_admin_api_token_or_exit() -> String {
match env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR) {
Ok(token) => token,
@@ -114,23 +170,7 @@ pub fn get_cloud_admin_api_token_or_exit() -> String {
}
}
pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> WorkerGuard {
let file_name = if dry_run {
format!(
"{}_{}_{}__dry.log",
binary_name,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
} else {
format!(
"{}_{}_{}.log",
binary_name,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
};
pub fn init_logging(file_name: &str) -> WorkerGuard {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
@@ -140,7 +180,6 @@ pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> Worker
.with_writer(file_writer);
let stdout_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
@@ -151,30 +190,43 @@ pub fn init_logging(binary_name: &str, dry_run: bool, node_kind: &str) -> Worker
guard
}
pub fn init_s3_client(account_id: String, bucket_region: Region) -> Client {
pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Client {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try("env", EnvironmentVariableCredentialsProvider::new())
// uses sso
.or_else(
let chain = CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
);
// Use SSO if we were given an account ID
match account_id {
Some(sso_account) => chain.or_else(
"sso",
SsoCredentialsProvider::builder()
.account_id(account_id)
.account_id(sso_account)
.role_name("PowerUserAccess")
.start_url("https://neondb.awsapps.com/start")
.region(Region::from_static("eu-central-1"))
.build(),
)
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
),
None => chain,
}
.or_else(
// Finally try IMDS
"imds",
ImdsCredentialsProvider::builder().build(),
)
};
let config = Config::builder()
let mut builder = Config::builder()
.region(bucket_region)
.credentials_provider(credentials_provider)
.build();
.credentials_provider(credentials_provider);
Client::from_conf(config)
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
builder = builder.endpoint_url(endpoint)
}
Client::from_conf(builder.build())
}
async fn list_objects_with_retries(
@@ -202,3 +254,45 @@ async fn list_objects_with_retries(
anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,
key: &str,
) -> anyhow::Result<Vec<u8>> {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
.await
{
Ok(bytes_read) => {
tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
return Ok(body_buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}

View File

@@ -1,19 +1,18 @@
use std::collections::HashMap;
use std::env;
use std::fmt::Display;
use std::num::NonZeroUsize;
use std::sync::Arc;
use anyhow::Context;
use aws_sdk_s3::config::Region;
use reqwest::Url;
use s3_scrubber::cloud_admin_api::CloudAdminApiClient;
use s3_scrubber::delete_batch_producer::DeleteBatchProducer;
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::{
checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, RootTarget, S3Deleter,
S3Target, TraversingDepth,
checks, get_cloud_admin_api_token_or_exit, init_logging, init_s3_client, BucketConfig,
ConsoleConfig, RootTarget, S3Deleter, S3Target, TraversingDepth, CLI_NAME,
};
use tracing::{info, info_span, warn};
use tracing::{info, warn};
use clap::{Parser, Subcommand, ValueEnum};
@@ -59,48 +58,7 @@ enum Command {
#[arg(short, long, default_value_t = false)]
skip_validation: bool,
},
}
struct BucketConfig {
region: String,
bucket: String,
sso_account_id: String,
}
impl Display for BucketConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}/{}", self.sso_account_id, self.region, self.bucket)
}
}
impl BucketConfig {
fn from_env() -> anyhow::Result<Self> {
let sso_account_id =
env::var("SSO_ACCOUNT_ID").context("'SSO_ACCOUNT_ID' param retrieval")?;
let region = env::var("REGION").context("'REGION' param retrieval")?;
let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
Ok(Self {
region,
bucket,
sso_account_id,
})
}
}
struct ConsoleConfig {
admin_api_url: Url,
}
impl ConsoleConfig {
fn from_env() -> anyhow::Result<Self> {
let admin_api_url: Url = env::var("CLOUD_ADMIN_API_URL")
.context("'CLOUD_ADMIN_API_URL' param retrieval")?
.parse()
.context("'CLOUD_ADMIN_API_URL' param parsing")?;
Ok(Self { admin_api_url })
}
ScanMetadata {},
}
async fn tidy(
@@ -111,13 +69,24 @@ async fn tidy(
depth: TraversingDepth,
skip_validation: bool,
) -> anyhow::Result<()> {
let binary_name = env::args()
.next()
.context("binary name in not the first argument")?;
let dry_run = !cli.delete;
let _guard = init_logging(&binary_name, dry_run, node_kind.as_str());
let _main_span = info_span!("tidy", binary = %binary_name, %dry_run).entered();
let file_name = if dry_run {
format!(
"{}_{}_{}__dry.log",
CLI_NAME,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
} else {
format!(
"{}_{}_{}.log",
CLI_NAME,
node_kind,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
)
};
let _guard = init_logging(&file_name);
if dry_run {
info!("Dry run, not removing items for real");
@@ -247,7 +216,7 @@ async fn main() -> anyhow::Result<()> {
let bucket_config = BucketConfig::from_env()?;
match &cli.command {
match cli.command {
Command::Tidy {
node_kind,
depth,
@@ -258,11 +227,25 @@ async fn main() -> anyhow::Result<()> {
&cli,
bucket_config,
console_config,
*node_kind,
*depth,
*skip_validation,
node_kind,
depth,
skip_validation,
)
.await
}
Command::ScanMetadata {} => match scan_metadata(bucket_config).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
println!("{}", summary.summary_string());
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else {
Ok(())
}
}
},
}
}

View File

@@ -0,0 +1,106 @@
use anyhow::Context;
use async_stream::{stream, try_stream};
use aws_sdk_s3::Client;
use tokio_stream::Stream;
use crate::{list_objects_with_retries, RootTarget, TenantId};
use utils::id::{TenantTimelineId, TimelineId};
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
try_stream! {
let mut continuation_token = None;
loop {
let tenants_target = target.tenants_root();
let fetch_response =
list_objects_with_retries(s3_client, tenants_target, continuation_token.clone()).await?;
let new_entry_ids = fetch_response
.common_prefixes()
.unwrap_or_default()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&tenants_target.prefix_in_bucket)?
.strip_suffix('/')
}).map(|entry_id_str| {
entry_id_str
.parse()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
yield i?;
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
/// Given a TenantId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let timelines_target = target.timelines_root(&tenant);
loop {
tracing::info!("Listing in {}", tenant);
let fetch_response =
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
.await;
let fetch_response = match fetch_response {
Err(e) => {
timeline_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let new_entry_ids = fetch_response
.common_prefixes()
.unwrap_or_default()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse::<TimelineId>()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
timeline_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
tracing::info!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
yield Ok(TenantTimelineId::new(tenant, id));
}
})
}

View File

@@ -164,7 +164,7 @@ async fn delete_tenants_batch(
s3_target,
s3_client,
dry_run,
|root_target, tenant_to_delete| root_target.tenant_root(tenant_to_delete),
|root_target, tenant_to_delete| root_target.tenant_root(&tenant_to_delete),
)
.await?;
@@ -215,7 +215,7 @@ async fn delete_timelines_batch(
s3_target,
s3_client,
dry_run,
|root_target, timeline_to_delete| root_target.timeline_root(timeline_to_delete),
|root_target, timeline_to_delete| root_target.timeline_root(&timeline_to_delete),
)
.await?;
@@ -386,7 +386,7 @@ async fn ensure_tenant_batch_deleted(
for &tenant_id in batch {
let fetch_response =
list_objects_with_retries(s3_client, &s3_target.tenant_root(tenant_id), None).await?;
list_objects_with_retries(s3_client, &s3_target.tenant_root(&tenant_id), None).await?;
if fetch_response.is_truncated()
|| fetch_response.contents().is_some()
@@ -415,7 +415,7 @@ async fn ensure_timeline_batch_deleted(
for &id in batch {
let fetch_response =
list_objects_with_retries(s3_client, &s3_target.timeline_root(id), None).await?;
list_objects_with_retries(s3_client, &s3_target.timeline_root(&id), None).await?;
if fetch_response.is_truncated()
|| fetch_response.contents().is_some()

View File

@@ -0,0 +1,234 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_logging, init_s3_client, BucketConfig, RootTarget, S3Target, CLI_NAME};
use aws_sdk_s3::Client;
use aws_types::region::Region;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::{IndexPart, TENANTS_SEGMENT_NAME};
use utils::id::TenantTimelineId;
pub struct MetadataSummary {
count: usize,
with_errors: HashSet<TenantTimelineId>,
with_warnings: HashSet<TenantTimelineId>,
with_garbage: HashSet<TenantTimelineId>,
indices_by_version: HashMap<usize, usize>,
layer_count: MinMaxHisto,
timeline_size_bytes: MinMaxHisto,
layer_size_bytes: MinMaxHisto,
}
/// A histogram plus minimum and maximum tracking
struct MinMaxHisto {
histo: Histogram,
min: u64,
max: u64,
}
impl MinMaxHisto {
fn new() -> Self {
Self {
histo: histogram::Histogram::builder()
.build()
.expect("Bad histogram params"),
min: u64::MAX,
max: 0,
}
}
fn sample(&mut self, v: u64) -> Result<(), histogram::Error> {
self.min = std::cmp::min(self.min, v);
self.max = std::cmp::max(self.max, v);
let r = self.histo.increment(v, 1);
if r.is_err() {
tracing::warn!("Bad histogram sample: {v}");
}
r
}
fn oneline(&self) -> String {
let percentiles = match self.histo.percentiles(&[1.0, 10.0, 50.0, 90.0, 99.0]) {
Ok(p) => p,
Err(e) => return format!("No data: {}", e),
};
let percentiles: Vec<u64> = percentiles
.iter()
.map(|p| p.bucket().low() + p.bucket().high() / 2)
.collect();
format!(
"min {}, 1% {}, 10% {}, 50% {}, 90% {}, 99% {}, max {}",
self.min,
percentiles[0],
percentiles[1],
percentiles[2],
percentiles[3],
percentiles[4],
self.max,
)
}
}
impl MetadataSummary {
fn new() -> Self {
Self {
count: 0,
with_errors: HashSet::new(),
with_warnings: HashSet::new(),
with_garbage: HashSet::new(),
indices_by_version: HashMap::new(),
layer_count: MinMaxHisto::new(),
timeline_size_bytes: MinMaxHisto::new(),
layer_size_bytes: MinMaxHisto::new(),
}
}
fn update_histograms(&mut self, index_part: &IndexPart) -> Result<(), histogram::Error> {
self.layer_count
.sample(index_part.layer_metadata.len() as u64)?;
let mut total_size: u64 = 0;
for meta in index_part.layer_metadata.values() {
total_size += meta.file_size;
self.layer_size_bytes.sample(meta.file_size)?;
}
self.timeline_size_bytes.sample(total_size)?;
Ok(())
}
fn update_data(&mut self, data: &S3TimelineBlobData) {
self.count += 1;
if let BlobDataParseResult::Parsed {
index_part,
s3_layers: _,
} = &data.blob_data
{
*self
.indices_by_version
.entry(index_part.get_version())
.or_insert(0) += 1;
if let Err(e) = self.update_histograms(index_part) {
// Value out of range? Warn that the results are untrustworthy
tracing::warn!(
"Error updating histograms, summary stats may be wrong: {}",
e
);
}
}
}
fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) {
if !analysis.errors.is_empty() {
self.with_errors.insert(*id);
}
if !analysis.warnings.is_empty() {
self.with_warnings.insert(*id);
}
}
/// Long-form output for printing at end of a scan
pub fn summary_string(&self) -> String {
let version_summary: String = itertools::join(
self.indices_by_version
.iter()
.map(|(k, v)| format!("{k}: {v}")),
", ",
);
format!(
"Timelines: {0}
With errors: {1}
With warnings: {2}
With garbage: {3}
Index versions: {version_summary}
Timeline size bytes: {4}
Layer size bytes: {5}
Timeline layer count: {6}
",
self.count,
self.with_errors.len(),
self.with_warnings.len(),
self.with_garbage.len(),
self.timeline_size_bytes.oneline(),
self.layer_size_bytes.oneline(),
self.layer_count.oneline(),
)
}
pub fn is_fatal(&self) -> bool {
!self.with_errors.is_empty()
}
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
let file_name = format!(
"{}_scan_metadata_{}_{}.log",
CLI_NAME,
bucket_config.bucket,
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
);
let _guard = init_logging(&file_name);
let s3_client = Arc::new(init_s3_client(
bucket_config.sso_account_id,
Region::new(bucket_config.region),
));
let delimiter = "/";
let target = RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket.to_string(),
prefix_in_bucket: ["pageserver", "v1", TENANTS_SEGMENT_NAME, ""].join(delimiter),
delimiter: delimiter.to_string(),
});
let tenants = stream_tenants(&s3_client, &target);
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
s3_client: &Client,
target: &RootTarget,
ttid: TenantTimelineId,
) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let mut summary = MetadataSummary::new();
pin_mut!(timelines);
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
let analysis =
branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await;
summary.update_analysis(&ttid, &analysis);
}
Ok(summary)
}

View File

@@ -415,6 +415,7 @@ class NeonEnvBuilder:
pg_distrib_dir: Path,
pg_version: PgVersion,
test_name: str,
test_output_dir: Path,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
@@ -455,6 +456,8 @@ class NeonEnvBuilder:
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
assert test_name.startswith(
"test_"
@@ -489,6 +492,23 @@ class NeonEnvBuilder:
return env
def enable_scrub_on_exit(self):
"""
Call this if you would like the fixture to automatically run
s3_scrubber at the end of the test, as a bidirectional test
that the scrubber is working properly, and that the code within
the test didn't produce any invalid remote state.
"""
if not isinstance(self.remote_storage, S3Storage):
# The scrubber can't talk to e.g. LocalFS -- it needs
# an HTTP endpoint (mock is fine) to connect to.
raise RuntimeError(
"Cannot scrub with remote_storage={self.remote_storage}, require an S3 endpoint"
)
self.scrub_on_exit = True
def enable_remote_storage(
self,
remote_storage_kind: RemoteStorageKind,
@@ -721,11 +741,20 @@ class NeonEnvBuilder:
self.env.pageserver.stop(immediate=True)
cleanup_error = None
if self.scrub_on_exit:
try:
S3Scrubber(self.test_output_dir, self).scan_metadata()
except Exception as e:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
try:
self.cleanup_remote_storage()
except Exception as e:
log.error(f"Error during remote storage cleanup: {e}")
cleanup_error = e
if cleanup_error is not None:
cleanup_error = e
try:
self.cleanup_local_storage()
@@ -929,6 +958,7 @@ def _shared_simple_env(
default_broker: NeonBroker,
run_id: uuid.UUID,
top_output_dir: Path,
test_output_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
@@ -957,6 +987,7 @@ def _shared_simple_env(
run_id=run_id,
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
test_name=request.node.name,
test_output_dir=test_output_dir,
) as builder:
env = builder.init_start()
@@ -984,7 +1015,7 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
@pytest.fixture(scope="function")
def neon_env_builder(
pytestconfig: Config,
test_output_dir: str,
test_output_dir: Path,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
@@ -1022,6 +1053,7 @@ def neon_env_builder(
run_id=run_id,
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
test_name=request.node.name,
test_output_dir=test_output_dir,
) as builder:
yield builder
@@ -1728,7 +1760,10 @@ class PgBin:
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs)
base_path, _, _ = subprocess_capture(
self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs
)
return base_path
@pytest.fixture(scope="function")
@@ -2734,6 +2769,41 @@ class SafekeeperHttpClient(requests.Session):
return metrics
class S3Scrubber:
def __init__(self, log_dir: Path, env: NeonEnvBuilder):
self.env = env
self.log_dir = log_dir
def scrubber_cli(self, args, timeout):
assert isinstance(self.env.remote_storage, S3Storage)
s3_storage = self.env.remote_storage
env = {
"REGION": s3_storage.bucket_region,
"BUCKET": s3_storage.bucket_name,
}
env.update(s3_storage.access_env_vars())
if s3_storage.endpoint is not None:
env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint})
base_args = [self.env.neon_binpath / "s3_scrubber"]
args = base_args + args
(output_path, _, status_code) = subprocess_capture(
self.log_dir, args, echo_stderr=True, echo_stdout=True, env=env, check=False
)
if status_code:
log.warning(f"Scrub command {args} failed")
log.warning(f"Scrub environment: {env}")
log.warning(f"Output at: {output_path}")
raise RuntimeError("Remote storage scrub failed")
def scan_metadata(self):
self.scrubber_cli(["scan-metadata"], timeout=30)
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""Compute the working directory for an individual test."""
test_name = request.node.name

View File

@@ -88,6 +88,19 @@ def available_s3_storages() -> List[RemoteStorageKind]:
return remote_storages
def s3_storage() -> RemoteStorageKind:
"""
For tests that require a remote storage impl that exposes an S3
endpoint, but don't want to parametrize over multiple storage types.
Use real S3 if available, else use MockS3
"""
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
return RemoteStorageKind.REAL_S3
else:
return RemoteStorageKind.MOCK_S3
@dataclass
class LocalFsStorage:
root: Path

View File

@@ -4,9 +4,10 @@ import os
import re
import subprocess
import tarfile
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar
from urllib.parse import urlencode
import allure
@@ -26,34 +27,100 @@ def get_self_dir() -> Path:
return Path(__file__).resolve().parent
def subprocess_capture(capture_dir: Path, cmd: List[str], **kwargs: Any) -> str:
"""Run a process and capture its output
def subprocess_capture(
capture_dir: Path,
cmd: List[str],
*,
check=False,
echo_stderr=False,
echo_stdout=False,
capture_stdout=False,
**kwargs: Any,
) -> Tuple[str, Optional[str], int]:
"""Run a process and bifurcate its output to files and the `log` logger
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
stderr and stdout are always captured in files. They are also optionally
echoed to the log (echo_stderr, echo_stdout), and/or captured and returned
(capture_stdout).
File output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
where "cmd" is the name of the program and NNN is an incrementing
counter.
If those files already exist, we will overwrite them.
Returns basepath for files with captured output.
Returns 3-tuple of:
- The base path for output files
- Captured stdout, or None
- The exit status of the process
"""
assert isinstance(cmd, list)
base = f"{os.path.basename(cmd[0])}_{global_counter()}"
base_cmd = os.path.basename(cmd[0])
base = f"{base_cmd}_{global_counter()}"
basepath = os.path.join(capture_dir, base)
stdout_filename = f"{basepath}.stdout"
stderr_filename = f"{basepath}.stderr"
# Since we will stream stdout and stderr concurrently, need to do it in a thread.
class OutputHandler(threading.Thread):
def __init__(self, in_file, out_file, echo: bool, capture: bool):
super().__init__()
self.in_file = in_file
self.out_file = out_file
self.echo = echo
self.capture = capture
self.captured = ""
def run(self):
for line in self.in_file:
# Only bother decoding if we are going to do something more than stream to a file
if self.echo or self.capture:
string = line.decode(encoding="utf-8", errors="replace")
if self.echo:
log.info(string)
if self.capture:
self.captured += string
self.out_file.write(line)
captured = None
try:
with open(stdout_filename, "w") as stdout_f:
with open(stderr_filename, "w") as stderr_f:
with open(stdout_filename, "wb") as stdout_f:
with open(stderr_filename, "wb") as stderr_f:
log.info(f'Capturing stdout to "{base}.stdout" and stderr to "{base}.stderr"')
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
p = subprocess.Popen(
cmd,
**kwargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout_handler = OutputHandler(
p.stdout, stdout_f, echo=echo_stdout, capture=capture_stdout
)
stdout_handler.start()
stderr_handler = OutputHandler(p.stderr, stderr_f, echo=echo_stderr, capture=False)
stderr_handler.start()
r = p.wait()
stdout_handler.join()
stderr_handler.join()
if check and r != 0:
raise subprocess.CalledProcessError(r, " ".join(cmd))
if capture_stdout:
captured = stdout_handler.captured
finally:
# Remove empty files if there is no output
for filename in (stdout_filename, stderr_filename):
if os.stat(filename).st_size == 0:
os.remove(filename)
return basepath
return (basepath, captured, r)
_global_counter = 0

View File

@@ -3,11 +3,15 @@ from contextlib import closing
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.remote_storage import s3_storage
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_remote_storage(remote_storage_kind=s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_pageserver_restart")
@@ -109,6 +113,9 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
# safekeeper and compute node keep running.
@pytest.mark.timeout(540)
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_remote_storage(remote_storage_kind=s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start()
# Use a tiny checkpoint distance, to create a lot of layers quickly.