Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
6d2bbffdab only for console 2023-12-15 12:28:50 +00:00
Conrad Ludgate
7151bcc175 proxy console force http2 2023-12-15 12:26:51 +00:00
12 changed files with 132 additions and 408 deletions

3
Cargo.lock generated
View File

@@ -4281,20 +4281,17 @@ dependencies = [
"async-stream",
"aws-config",
"aws-sdk-s3",
"aws-smithy-async",
"bincode",
"bytes",
"chrono",
"clap",
"crc32c",
"either",
"futures",
"futures-util",
"hex",
"histogram",
"itertools",
"pageserver",
"pageserver_api",
"rand 0.8.5",
"remote_storage",
"reqwest",

View File

@@ -1,6 +1,3 @@
//! Links with walproposer, pgcommon, pgport and runs bindgen on walproposer.h
//! to generate Rust bindings for it.
use std::{env, path::PathBuf, process::Command};
use anyhow::{anyhow, Context};

View File

@@ -1,6 +1,3 @@
//! A C-Rust shim: defines implementation of C walproposer API, assuming wp
//! callback_data stores Box to some Rust implementation.
#![allow(dead_code)]
use std::ffi::CStr;

View File

@@ -4,14 +4,12 @@
pub mod health_server;
use std::{sync::Arc, time::Duration};
use std::time::Duration;
use futures::FutureExt;
pub use reqwest::{Request, Response, StatusCode};
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::time::Instant;
use tracing::trace;
use crate::{proxy::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
use reqwest_middleware::RequestBuilder;
@@ -21,7 +19,7 @@ use reqwest_middleware::RequestBuilder;
/// We deliberately don't want to replace this with a public static.
pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> ClientWithMiddleware {
let client = reqwest::ClientBuilder::new()
.dns_resolver(Arc::new(GaiResolver::default()))
.http2_prior_knowledge()
.connection_verbose(true)
.build()
.expect("Failed to create http client");
@@ -34,7 +32,6 @@ pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> Clien
pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
let timeout_client = reqwest::ClientBuilder::new()
.dns_resolver(Arc::new(GaiResolver::default()))
.connection_verbose(true)
.timeout(default_timout)
.build()
@@ -100,37 +97,6 @@ impl Endpoint {
}
}
/// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
use hyper::{
client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
service::Service,
};
use reqwest::dns::{Addrs, Resolve, Resolving};
#[derive(Debug)]
pub struct GaiResolver(HyperGaiResolver);
impl Default for GaiResolver {
fn default() -> Self {
Self(HyperGaiResolver::new())
}
}
impl Resolve for GaiResolver {
fn resolve(&self, name: Name) -> Resolving {
let this = &mut self.0.clone();
let start = Instant::now();
Box::pin(
Service::<Name>::call(this, name.clone()).map(move |result| {
let resolve_duration = start.elapsed();
trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
result
.map(|addrs| -> Addrs { Box::new(addrs) })
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
}),
)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -6,7 +6,6 @@ license.workspace = true
[dependencies]
aws-sdk-s3.workspace = true
aws-smithy-async.workspace = true
either.workspace = true
tokio-rustls.workspace = true
anyhow.workspace = true
@@ -32,7 +31,6 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls"
aws-config = { workspace = true, default-features = false, features = ["rustls", "sso"] }
pageserver = { path = "../pageserver" }
pageserver_api = { path = "../libs/pageserver_api" }
remote_storage = { path = "../libs/remote_storage" }
tracing.workspace = true
@@ -40,5 +38,3 @@ tracing-subscriber.workspace = true
clap.workspace = true
tracing-appender = "0.2"
histogram = "0.7"
futures.workspace = true

View File

@@ -2,17 +2,18 @@ use std::collections::HashSet;
use anyhow::Context;
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use tracing::{error, info, warn};
use utils::generation::Generation;
use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
use crate::{download_object_with_retries, RootTarget};
use futures_util::{pin_mut, StreamExt};
use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::IndexPart;
use remote_storage::RemotePath;
use utils::id::TenantTimelineId;
pub(crate) struct TimelineAnalysis {
/// Anomalies detected
@@ -38,8 +39,8 @@ impl TimelineAnalysis {
}
}
pub(crate) fn branch_cleanup_and_check_errors(
id: &TenantShardTimelineId,
pub(crate) async fn branch_cleanup_and_check_errors(
id: &TenantTimelineId,
s3_root: &RootTarget,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
@@ -47,13 +48,12 @@ pub(crate) fn branch_cleanup_and_check_errors(
) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new();
tracing::trace!("Checking timeline {id}");
info!("Checking timeline {id}");
if let Some(s3_active_branch) = s3_active_branch {
tracing::trace!(
info!(
"Checking console status for timeline for branch {:?}/{:?}",
s3_active_branch.project_id,
s3_active_branch.id
s3_active_branch.project_id, s3_active_branch.id
);
match console_branch {
Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
@@ -84,10 +84,10 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
// result.warnings.push(format!(
// "index_part.json version is not latest: {}",
// index_part.get_version()
// ))
result.warnings.push(format!(
"index_part.json version is not latest: {}",
index_part.get_version()
))
}
if index_part.metadata.disk_consistent_lsn()
@@ -102,7 +102,7 @@ pub(crate) fn branch_cleanup_and_check_errors(
if index_part.layer_metadata.is_empty() {
// not an error, can happen for branches with zero writes, but notice that
tracing::trace!("index_part.json has no layers");
info!("index_part.json has no layers");
}
for (layer, metadata) in index_part.layer_metadata {
@@ -186,17 +186,17 @@ pub(crate) fn branch_cleanup_and_check_errors(
}
if result.errors.is_empty() {
tracing::trace!("No check errors found");
info!("No check errors found");
} else {
tracing::info!("Timeline metadata errors: {:?}", result.errors);
warn!("Timeline metadata errors: {0:?}", result.errors);
}
if !result.warnings.is_empty() {
tracing::info!("Timeline metadata warnings: {:?}", result.warnings);
warn!("Timeline metadata warnings: {0:?}", result.warnings);
}
if !result.garbage_keys.is_empty() {
tracing::info!(
error!(
"The following keys should be removed from S3: {0:?}",
result.garbage_keys
)
@@ -238,7 +238,7 @@ fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), St
pub(crate) async fn list_timeline_blobs(
s3_client: &Client,
id: TenantShardTimelineId,
id: TenantTimelineId,
s3_root: &RootTarget,
) -> anyhow::Result<S3TimelineBlobData> {
let mut s3_layers = HashSet::new();
@@ -261,20 +261,20 @@ pub(crate) async fn list_timeline_blobs(
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::trace!("Index key {key}");
tracing::info!("Index key {key}");
index_parts.push(obj)
}
Some("initdb.tar.zst") => {
tracing::trace!("initdb archive {key}");
tracing::info!("initdb archive {key}");
initdb_archive = true;
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::trace!("Parsed layer key: {} {:?}", new_layer, gen);
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
s3_layers.insert((new_layer, gen));
}
Err(e) => {
tracing::trace!("Error parsing key {maybe_layer_name}");
tracing::info!("Error parsing key {maybe_layer_name}");
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
@@ -282,7 +282,7 @@ pub(crate) async fn list_timeline_blobs(
}
},
None => {
tracing::trace!("Peculiar key {}", key);
tracing::info!("Peculiar key {}", key);
errors.push(format!("S3 list response got an object with odd key {key}"));
keys_to_remove.push(key.to_string());
}
@@ -290,7 +290,7 @@ pub(crate) async fn list_timeline_blobs(
}
if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
tracing::trace!(
tracing::info!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {

View File

@@ -10,16 +10,15 @@ use aws_sdk_s3::{
Client,
};
use futures_util::{pin_mut, TryStreamExt};
use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use utils::id::TenantId;
use utils::id::{TenantId, TenantTimelineId};
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote,
metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TraversingDepth,
};
#[derive(Serialize, Deserialize, Debug)]
@@ -30,8 +29,8 @@ enum GarbageReason {
#[derive(Serialize, Deserialize, Debug)]
enum GarbageEntity {
Tenant(TenantShardId),
Timeline(TenantShardTimelineId),
Tenant(TenantId),
Timeline(TenantTimelineId),
}
#[derive(Serialize, Deserialize, Debug)]
@@ -143,9 +142,6 @@ async fn find_garbage_inner(
console_projects.len()
);
// TODO(sharding): batch calls into Console so that we only call once for each TenantId,
// rather than checking the same TenantId for multiple TenantShardId
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
let tenants = stream_tenants(&s3_client, &target);
@@ -153,10 +149,10 @@ async fn find_garbage_inner(
let api_client = cloud_admin_api_client.clone();
let console_projects = &console_projects;
async move {
match console_projects.get(&t.tenant_id) {
match console_projects.get(&t) {
Some(project_data) => Ok((t, Some(project_data.clone()))),
None => api_client
.find_tenant_project(t.tenant_id)
.find_tenant_project(t)
.await
.map_err(|e| anyhow::anyhow!(e))
.map(|r| (t, r)),
@@ -170,21 +166,21 @@ async fn find_garbage_inner(
// checks if they are enabled by the `depth` parameter.
pin_mut!(tenants_checked);
let mut garbage = GarbageList::new(node_kind, bucket_config);
let mut active_tenants: Vec<TenantShardId> = vec![];
let mut active_tenants: Vec<TenantId> = vec![];
let mut counter = 0;
while let Some(result) = tenants_checked.next().await {
let (tenant_shard_id, console_result) = result?;
let (tenant_id, console_result) = result?;
// Paranoia check
if let Some(project) = &console_result {
assert!(project.tenant == tenant_shard_id.tenant_id);
assert!(project.tenant == tenant_id);
}
if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
tracing::debug!("Tenant {tenant_shard_id} is garbage");
if garbage.maybe_append(GarbageEntity::Tenant(tenant_id), console_result) {
tracing::debug!("Tenant {tenant_id} is garbage");
} else {
tracing::debug!("Tenant {tenant_shard_id} is active");
active_tenants.push(tenant_shard_id);
tracing::debug!("Tenant {tenant_id} is active");
active_tenants.push(tenant_id);
}
counter += 1;
@@ -270,13 +266,13 @@ impl std::fmt::Display for PurgeMode {
pub async fn get_tenant_objects(
s3_client: &Arc<Client>,
target: RootTarget,
tenant_shard_id: TenantShardId,
tenant_id: TenantId,
) -> anyhow::Result<Vec<ObjectIdentifier>> {
tracing::debug!("Listing objects in tenant {tenant_shard_id}");
tracing::debug!("Listing objects in tenant {tenant_id}");
// TODO: apply extra validation based on object modification time. Don't purge
// tenants where any timeline's index_part.json has been touched recently.
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let mut tenant_root = target.tenant_root(&tenant_id);
// Remove delimiter, so that object listing lists all keys in the prefix and not just
// common prefixes.
@@ -289,7 +285,7 @@ pub async fn get_tenant_objects(
pub async fn get_timeline_objects(
s3_client: &Arc<Client>,
target: RootTarget,
ttid: TenantShardTimelineId,
ttid: TenantTimelineId,
) -> anyhow::Result<Vec<ObjectIdentifier>> {
tracing::debug!("Listing objects in timeline {ttid}");
let mut timeline_root = target.timeline_root(&ttid);

View File

@@ -15,16 +15,13 @@ use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::retry::RetryConfig;
use aws_config::sso::SsoCredentialsProvider;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use clap::ValueEnum;
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use pageserver_api::shard::TenantShardId;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::io::IsTerminal;
@@ -32,7 +29,7 @@ use tokio::io::AsyncReadExt;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use utils::id::TimelineId;
use utils::id::{TenantId, TenantTimelineId};
const MAX_RETRIES: usize = 20;
const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
@@ -47,35 +44,6 @@ pub struct S3Target {
pub delimiter: String,
}
/// Convenience for referring to timelines within a particular shard: more ergonomic
/// than using a 2-tuple.
///
/// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
/// than somewhere more broadly exposed, because this kind of thing is rarely needed
/// in the pageserver, as all timeline objects existing in the scope of a particular
/// tenant: the scrubber is different in that it handles collections of data referring to many
/// TenantShardTimelineIds in on place.
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct TenantShardTimelineId {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
}
impl TenantShardTimelineId {
fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
Self {
tenant_shard_id,
timeline_id,
}
}
}
impl Display for TenantShardTimelineId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
}
}
#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum TraversingDepth {
Tenant,
@@ -142,19 +110,19 @@ impl RootTarget {
}
}
pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
pub fn tenant_root(&self, tenant_id: &TenantId) -> S3Target {
self.tenants_root().with_sub_segment(&tenant_id.to_string())
}
pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
pub fn timelines_root(&self, tenant_id: &TenantId) -> S3Target {
match self {
Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
Self::Safekeeper(_) => self.tenant_root(tenant_id),
}
}
pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
self.timelines_root(&id.tenant_shard_id)
pub fn timeline_root(&self, id: &TenantTimelineId) -> S3Target {
self.timelines_root(&id.tenant_id)
.with_sub_segment(&id.timeline_id.to_string())
}
@@ -279,13 +247,9 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
)
};
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
let mut builder = Config::builder()
.behavior_version(BehaviorVersion::v2023_11_09())
.region(bucket_region)
.retry_config(RetryConfig::adaptive().with_max_attempts(1))
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.credentials_provider(credentials_provider);
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
@@ -340,7 +304,7 @@ async fn list_objects_with_retries(
{
Ok(response) => return Ok(response),
Err(e) => {
error!("list_objects_v2 query failed: {e:?}");
error!("list_objects_v2 query failed: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
@@ -378,7 +342,7 @@ async fn download_object_with_retries(
.await
{
Ok(bytes_read) => {
tracing::trace!("Downloaded {bytes_read} bytes for object object with key {key}");
tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
return Ok(body_buf);
}
Err(e) => {

View File

@@ -1,4 +1,3 @@
use pageserver_api::shard::TenantShardId;
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
use s3_scrubber::scan_metadata::scan_metadata;
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
@@ -35,8 +34,6 @@ enum Command {
ScanMetadata {
#[arg(short, long, default_value_t = false)]
json: bool,
#[arg(long = "tenant-id", num_args = 0..)]
tenant_ids: Vec<TenantShardId>,
},
}
@@ -60,37 +57,35 @@ async fn main() -> anyhow::Result<()> {
));
match cli.command {
Command::ScanMetadata { json, tenant_ids } => {
match scan_metadata(bucket_config.clone(), tenant_ids).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await {
Err(e) => {
tracing::error!("Failed: {e}");
Err(e)
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
Ok(summary) => {
if json {
println!("{}", serde_json::to_string(&summary).unwrap())
} else {
println!("{}", summary.summary_string());
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
}
}
}
},
Command::FindGarbage {
node_kind,
depth,

View File

@@ -3,15 +3,14 @@ use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use tokio_stream::Stream;
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
use pageserver_api::shard::TenantShardId;
use utils::id::TimelineId;
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantId};
use utils::id::{TenantTimelineId, TimelineId};
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
) -> impl Stream<Item = anyhow::Result<TenantId>> + 'a {
try_stream! {
let mut continuation_token = None;
let tenants_target = target.tenants_root();
@@ -45,20 +44,20 @@ pub fn stream_tenants<'a>(
}
}
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// Given a TenantId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn get_tenant_timelines(
s3_client: &Client,
target: &RootTarget,
tenant: TenantShardId,
) -> Vec<anyhow::Result<TimelineId>> {
let mut timeline_ids = Vec::new();
pub async fn stream_tenant_timelines<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let timelines_target = target.timelines_root(&tenant);
loop {
tracing::trace!("Listing in {}", tenant);
tracing::info!("Listing in {}", tenant);
let fetch_response =
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
.await;
@@ -95,21 +94,11 @@ pub async fn get_tenant_timelines(
}
}
timeline_ids
}
pub async fn stream_tenant_timelines<'a>(
client: &'a Client,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let timelines = get_tenant_timelines(client, target, tenant).await;
// FIXME: futures is not yet imported so have to keep doing it like this:
tracing::info!("Yielding for {}", tenant);
Ok(stream! {
for i in timelines {
for i in timeline_ids {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
yield Ok(TenantTimelineId::new(tenant, id));
}
})
}

View File

@@ -1,28 +1,24 @@
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TimelineAnalysis,
};
use crate::metadata_stream::stream_tenants;
use crate::{init_remote, BucketConfig, NodeKind, TenantShardTimelineId};
use futures_util::StreamExt;
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget};
use aws_sdk_s3::Client;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use histogram::Histogram;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use tracing::Instrument;
use utils::id::TenantTimelineId;
#[derive(Serialize)]
pub struct MetadataSummary {
count: usize,
with_errors: HashSet<TenantShardTimelineId>,
with_warnings: HashSet<TenantShardTimelineId>,
with_garbage: HashSet<TenantShardTimelineId>,
with_errors: HashSet<TenantTimelineId>,
with_warnings: HashSet<TenantTimelineId>,
with_garbage: HashSet<TenantTimelineId>,
indices_by_version: HashMap<usize, usize>,
layer_count: MinMaxHisto,
@@ -108,7 +104,7 @@ impl MetadataSummary {
total_size += meta.file_size;
self.layer_size_bytes.sample(meta.file_size)?;
}
self.timeline_size_bytes.sample(total_size / 1024)?;
self.timeline_size_bytes.sample(total_size)?;
Ok(())
}
@@ -136,7 +132,7 @@ impl MetadataSummary {
}
}
fn update_analysis(&mut self, id: &TenantShardTimelineId, analysis: &TimelineAnalysis) {
fn update_analysis(&mut self, id: &TenantTimelineId, analysis: &TimelineAnalysis) {
if !analysis.errors.is_empty() {
self.with_errors.insert(*id);
}
@@ -161,7 +157,7 @@ With errors: {1}
With warnings: {2}
With garbage: {3}
Index versions: {version_summary}
Timeline size KiB: {4}
Timeline size bytes: {4}
Layer size bytes: {5}
Timeline layer count: {6}
",
@@ -184,204 +180,44 @@ Timeline layer count: {6}
}
}
#[derive(Debug)]
enum Either<A, B> {
Left(A),
Right(B),
}
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let target = Arc::new(target);
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
let tenants = stream_tenants(&s3_client, &target);
let tenants = tenants.fuse();
// How many tenants to process in parallel. We need to be mindful of pageservers
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
const CONCURRENCY: usize = 32;
let mut tenants = std::pin::pin!(tenants);
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let timelines = timelines.try_flatten();
let mut js = tokio::task::JoinSet::new();
let mut consumed_all = false;
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
s3_client: &Client,
target: &RootTarget,
ttid: TenantTimelineId,
) -> anyhow::Result<(TenantTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
let summary = MetadataSummary::new();
let mut summary = MetadataSummary::new();
pin_mut!(timelines);
while let Some(i) = timelines.next().await {
let (ttid, data) = i?;
summary.update_data(&data);
// have timeline and timeline blob listings fight over the same semaphore
let timeline_listings = Arc::new(tokio::sync::Semaphore::new(50));
let blob_listings = timeline_listings.clone();
let analysis =
branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data)).await;
let spawned_tenants = AtomicUsize::new(0);
let spawned_timelines = Arc::new(AtomicUsize::new(0));
let completed_tenants = AtomicUsize::new(0);
let completed_timelines = AtomicUsize::new(0);
let last_tenant_id = std::cell::RefCell::new(None);
let s3_client = s3_client.clone();
let target = target.clone();
let summary = std::sync::Mutex::new(summary);
let scan_tenants = async {
let timeline_listings = timeline_listings;
let blob_listings = blob_listings;
// used to control whether to receive more tenants
let mut more_tenants = true;
loop {
let next_start = tokio::select! {
next_tenant = tenants.next(), if !consumed_all && more_tenants => {
match next_tenant {
Some(Ok(tenant_id)) => Either::Left(tenant_id),
Some(Err(e)) => {
consumed_all = true;
tracing::error!("tenant streaming failed with: {e:?}");
continue;
}
None => {
consumed_all = true;
continue;
}
}
},
next = js.join_next(), if !js.is_empty() => {
more_tenants = js.len() < 10;
match next.unwrap() {
Ok(Either::Left((tenant_id, timelines))) => {
completed_tenants.fetch_add(1, Ordering::Relaxed);
Either::Right((tenant_id, timelines))
}
Ok(Either::Right(Some((ttid, data)))) => {
completed_timelines.fetch_add(1, Ordering::Relaxed);
let ttid: TenantShardTimelineId = ttid;
{
let _e = tracing::info_span!("analysis", tenant_shard_id=%ttid.tenant_shard_id, timeline_id=%ttid.timeline_id).entered();
let summary = &mut summary.lock().unwrap();
summary.update_data(&data);
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
summary.update_analysis(&ttid, &analysis);
}
continue;
}
Ok(Either::Right(None)) => {
completed_timelines.fetch_add(1, Ordering::Relaxed);
continue;
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
continue;
},
Err(je) => {
tracing::error!("unknown join error: {je:?}");
continue;
}
}
},
else => break,
};
let s3_client = s3_client.clone();
let target = target.clone();
let timeline_listings = timeline_listings.clone();
let blob_listings = blob_listings.clone();
match next_start {
Either::Left(tenant_shard_id) => {
let span = tracing::info_span!("get_timelines", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
js.spawn(
async move {
let _permit = timeline_listings.acquire().await;
let timelines = crate::metadata_stream::get_tenant_timelines(
&s3_client,
&target,
tenant_shard_id,
)
.await;
Either::Left((tenant_shard_id, timelines))
}
.instrument(span),
);
more_tenants = js.len() < 1000;
spawned_tenants.fetch_add(1, Ordering::Relaxed);
*last_tenant_id.borrow_mut() = Some(tenant_shard_id);
}
Either::Right((tenant_shard_id, timelines)) => {
for timeline_id in timelines {
let timeline_id = match timeline_id {
Ok(timeline_id) => timeline_id,
Err(e) => {
tracing::error!("failed to fetch a timeline: {e:?}");
continue;
}
};
let s3_client = s3_client.clone();
let target = target.clone();
let blob_listings = blob_listings.clone();
let span = tracing::info_span!("list_timelines_blobs", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
js.spawn(
async move {
let _permit = blob_listings.acquire().await;
let ttid = TenantShardTimelineId::new(tenant_shard_id, timeline_id);
match list_timeline_blobs(&s3_client, ttid, &target).await {
Ok(data) => Either::Right(Some((ttid, data))),
Err(e) => {
tracing::error!("listing failed {e:?}");
Either::Right(None)
}
}
}
.instrument(span),
);
spawned_timelines.fetch_add(1, Ordering::Relaxed);
tokio::task::yield_now().await;
}
}
}
}
};
let started_at = std::time::Instant::now();
{
let mut scan_tenants = std::pin::pin!(scan_tenants);
loop {
let res =
tokio::time::timeout(std::time::Duration::from_secs(1), &mut scan_tenants).await;
let spawned_tenants = spawned_tenants.load(Ordering::Relaxed);
let completed_tenants = completed_tenants.load(Ordering::Relaxed);
let spawned_timelines = spawned_timelines.load(Ordering::Relaxed);
let completed_timelines = completed_timelines.load(Ordering::Relaxed);
match res {
Ok(()) => {
tracing::info!("progress tenants: {completed_tenants:>6} / {spawned_tenants:<6}, timelines: {completed_timelines:>6} / {spawned_timelines:<6} after {:?}", started_at.elapsed());
break;
}
Err(_timeout) => {
tracing::info!("progress tenants: {completed_tenants:>6} / {spawned_tenants:<6}, timelines: {completed_timelines:>6} / {spawned_timelines:<6}, last tenant: {:?}", &*last_tenant_id.borrow());
}
}
}
summary.update_analysis(&ttid, &analysis);
}
Ok(summary.into_inner().unwrap())
Ok(summary)
}

View File

@@ -52,16 +52,7 @@ def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, N
TenantId(t["id"]) for t in ps_http.tenant_list()
], "tenant should not be attached after negative test"
env.pageserver.allowed_errors.extend(
[
# This fixture detaches the tenant, and tests using it will tend to re-attach it
# shortly after. There may be un-processed deletion_queue validations from the
# initial attachment
".*Dropped remote consistent LSN updates.*",
# This fixture is for tests that will intentionally generate 400 responses
".*Error processing HTTP request: Bad request",
]
)
env.pageserver.allowed_errors.append(".*Error processing HTTP request: Bad request")
def log_contains_bad_request():
env.pageserver.log_contains(".*Error processing HTTP request: Bad request")