mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
Compare commits
9 Commits
RFC_merged
...
2023-12-21
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0aa2e2244 | ||
|
|
3432805e25 | ||
|
|
2c23ed4873 | ||
|
|
a8f7398518 | ||
|
|
9684fb2ba5 | ||
|
|
3ae40556f8 | ||
|
|
22a7b68b23 | ||
|
|
e98c48322d | ||
|
|
a2d05f8d94 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4281,12 +4281,14 @@ dependencies = [
|
||||
"async-stream",
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-async",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
"either",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"histogram",
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
aws-sdk-s3.workspace = true
|
||||
aws-smithy-async.workspace = true
|
||||
either.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
anyhow.workspace = true
|
||||
@@ -39,3 +40,5 @@ tracing-subscriber.workspace = true
|
||||
clap.workspace = true
|
||||
tracing-appender = "0.2"
|
||||
histogram = "0.7"
|
||||
|
||||
futures.workspace = true
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashSet;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_sdk_s3::{types::ObjectIdentifier, Client};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use utils::generation::Generation;
|
||||
|
||||
use crate::cloud_admin_api::BranchData;
|
||||
@@ -47,12 +47,13 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
) -> TimelineAnalysis {
|
||||
let mut result = TimelineAnalysis::new();
|
||||
|
||||
info!("Checking timeline {id}");
|
||||
tracing::trace!("Checking timeline {id}");
|
||||
|
||||
if let Some(s3_active_branch) = s3_active_branch {
|
||||
info!(
|
||||
tracing::trace!(
|
||||
"Checking console status for timeline for branch {:?}/{:?}",
|
||||
s3_active_branch.project_id, s3_active_branch.id
|
||||
s3_active_branch.project_id,
|
||||
s3_active_branch.id
|
||||
);
|
||||
match console_branch {
|
||||
Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
|
||||
@@ -83,10 +84,10 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
}
|
||||
|
||||
if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
|
||||
result.warnings.push(format!(
|
||||
"index_part.json version is not latest: {}",
|
||||
index_part.get_version()
|
||||
))
|
||||
// result.warnings.push(format!(
|
||||
// "index_part.json version is not latest: {}",
|
||||
// index_part.get_version()
|
||||
// ))
|
||||
}
|
||||
|
||||
if index_part.metadata.disk_consistent_lsn()
|
||||
@@ -101,7 +102,7 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
|
||||
if index_part.layer_metadata.is_empty() {
|
||||
// not an error, can happen for branches with zero writes, but notice that
|
||||
info!("index_part.json has no layers");
|
||||
tracing::trace!("index_part.json has no layers");
|
||||
}
|
||||
|
||||
for (layer, metadata) in index_part.layer_metadata {
|
||||
@@ -185,17 +186,17 @@ pub(crate) fn branch_cleanup_and_check_errors(
|
||||
}
|
||||
|
||||
if result.errors.is_empty() {
|
||||
info!("No check errors found");
|
||||
tracing::trace!("No check errors found");
|
||||
} else {
|
||||
warn!("Timeline metadata errors: {0:?}", result.errors);
|
||||
tracing::info!("Timeline metadata errors: {:?}", result.errors);
|
||||
}
|
||||
|
||||
if !result.warnings.is_empty() {
|
||||
warn!("Timeline metadata warnings: {0:?}", result.warnings);
|
||||
tracing::info!("Timeline metadata warnings: {:?}", result.warnings);
|
||||
}
|
||||
|
||||
if !result.garbage_keys.is_empty() {
|
||||
error!(
|
||||
tracing::info!(
|
||||
"The following keys should be removed from S3: {0:?}",
|
||||
result.garbage_keys
|
||||
)
|
||||
@@ -260,20 +261,20 @@ pub(crate) async fn list_timeline_blobs(
|
||||
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
|
||||
match blob_name {
|
||||
Some(name) if name.starts_with("index_part.json") => {
|
||||
tracing::info!("Index key {key}");
|
||||
tracing::trace!("Index key {key}");
|
||||
index_parts.push(obj)
|
||||
}
|
||||
Some("initdb.tar.zst") => {
|
||||
tracing::info!("initdb archive {key}");
|
||||
tracing::trace!("initdb archive {key}");
|
||||
initdb_archive = true;
|
||||
}
|
||||
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
|
||||
Ok((new_layer, gen)) => {
|
||||
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
|
||||
tracing::trace!("Parsed layer key: {} {:?}", new_layer, gen);
|
||||
s3_layers.insert((new_layer, gen));
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("Error parsing key {maybe_layer_name}");
|
||||
tracing::trace!("Error parsing key {maybe_layer_name}");
|
||||
errors.push(
|
||||
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
|
||||
);
|
||||
@@ -281,7 +282,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
}
|
||||
},
|
||||
None => {
|
||||
tracing::info!("Peculiar key {}", key);
|
||||
tracing::trace!("Peculiar key {}", key);
|
||||
errors.push(format!("S3 list response got an object with odd key {key}"));
|
||||
keys_to_remove.push(key.to_string());
|
||||
}
|
||||
@@ -289,7 +290,7 @@ pub(crate) async fn list_timeline_blobs(
|
||||
}
|
||||
|
||||
if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
|
||||
tracing::info!(
|
||||
tracing::trace!(
|
||||
"Timeline is empty apart from initdb archive: expected post-deletion state."
|
||||
);
|
||||
return Ok(S3TimelineBlobData {
|
||||
|
||||
@@ -15,10 +15,12 @@ use anyhow::Context;
|
||||
use aws_config::environment::EnvironmentVariableCredentialsProvider;
|
||||
use aws_config::imds::credentials::ImdsCredentialsProvider;
|
||||
use aws_config::meta::credentials::CredentialsProviderChain;
|
||||
use aws_config::retry::RetryConfig;
|
||||
use aws_config::sso::SsoCredentialsProvider;
|
||||
use aws_config::BehaviorVersion;
|
||||
use aws_sdk_s3::config::Region;
|
||||
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::{Client, Config};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
|
||||
use clap::ValueEnum;
|
||||
use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
@@ -55,8 +57,8 @@ pub struct S3Target {
|
||||
/// TenantShardTimelineIds in on place.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct TenantShardTimelineId {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl TenantShardTimelineId {
|
||||
@@ -277,9 +279,13 @@ pub fn init_s3_client(account_id: Option<String>, bucket_region: Region) -> Clie
|
||||
)
|
||||
};
|
||||
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
let mut builder = Config::builder()
|
||||
.behavior_version(BehaviorVersion::v2023_11_09())
|
||||
.region(bucket_region)
|
||||
.retry_config(RetryConfig::adaptive().with_max_attempts(1))
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
|
||||
.credentials_provider(credentials_provider);
|
||||
|
||||
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
|
||||
@@ -334,7 +340,7 @@ async fn list_objects_with_retries(
|
||||
{
|
||||
Ok(response) => return Ok(response),
|
||||
Err(e) => {
|
||||
error!("list_objects_v2 query failed: {e}");
|
||||
error!("list_objects_v2 query failed: {e:?}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
@@ -372,7 +378,7 @@ async fn download_object_with_retries(
|
||||
.await
|
||||
{
|
||||
Ok(bytes_read) => {
|
||||
tracing::info!("Downloaded {bytes_read} bytes for object object with key {key}");
|
||||
tracing::trace!("Downloaded {bytes_read} bytes for object object with key {key}");
|
||||
return Ok(body_buf);
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode};
|
||||
use s3_scrubber::scan_metadata::scan_metadata;
|
||||
use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth};
|
||||
@@ -34,6 +35,8 @@ enum Command {
|
||||
ScanMetadata {
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
json: bool,
|
||||
#[arg(long = "tenant-id", num_args = 0..)]
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -57,35 +60,37 @@ async fn main() -> anyhow::Result<()> {
|
||||
));
|
||||
|
||||
match cli.command {
|
||||
Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
}
|
||||
Ok(summary) => {
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(&summary).unwrap())
|
||||
} else {
|
||||
println!("{}", summary.summary_string());
|
||||
Command::ScanMetadata { json, tenant_ids } => {
|
||||
match scan_metadata(bucket_config.clone(), tenant_ids).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e}");
|
||||
Err(e)
|
||||
}
|
||||
if summary.is_fatal() {
|
||||
Err(anyhow::anyhow!("Fatal scrub errors detected"))
|
||||
} else if summary.is_empty() {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
Err(anyhow::anyhow!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
Ok(summary) => {
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(&summary).unwrap())
|
||||
} else {
|
||||
println!("{}", summary.summary_string());
|
||||
}
|
||||
if summary.is_fatal() {
|
||||
Err(anyhow::anyhow!("Fatal scrub errors detected"))
|
||||
} else if summary.is_empty() {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
Err(anyhow::anyhow!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Command::FindGarbage {
|
||||
node_kind,
|
||||
depth,
|
||||
|
||||
@@ -48,17 +48,17 @@ pub fn stream_tenants<'a>(
|
||||
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
|
||||
/// using ListObjectsv2. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
pub async fn stream_tenant_timelines<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
pub async fn get_tenant_timelines(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
tenant: TenantShardId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
|
||||
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
|
||||
) -> Vec<anyhow::Result<TimelineId>> {
|
||||
let mut timeline_ids = Vec::new();
|
||||
let mut continuation_token = None;
|
||||
let timelines_target = target.timelines_root(&tenant);
|
||||
|
||||
loop {
|
||||
tracing::info!("Listing in {}", tenant);
|
||||
tracing::trace!("Listing in {}", tenant);
|
||||
let fetch_response =
|
||||
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
|
||||
.await;
|
||||
@@ -95,9 +95,19 @@ pub async fn stream_tenant_timelines<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Yielding for {}", tenant);
|
||||
timeline_ids
|
||||
}
|
||||
|
||||
pub async fn stream_tenant_timelines<'a>(
|
||||
client: &'a Client,
|
||||
target: &'a RootTarget,
|
||||
tenant: TenantShardId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
|
||||
let timelines = get_tenant_timelines(client, target, tenant).await;
|
||||
|
||||
// FIXME: futures is not yet imported so have to keep doing it like this:
|
||||
Ok(stream! {
|
||||
for i in timeline_ids {
|
||||
for i in timelines {
|
||||
let id = i?;
|
||||
yield Ok(TenantShardTimelineId::new(tenant, id));
|
||||
}
|
||||
|
||||
@@ -1,16 +1,21 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::checks::{
|
||||
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
|
||||
TimelineAnalysis,
|
||||
};
|
||||
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
|
||||
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
|
||||
use aws_sdk_s3::Client;
|
||||
use futures_util::{pin_mut, StreamExt, TryStreamExt};
|
||||
use crate::metadata_stream::stream_tenants;
|
||||
use crate::{init_remote, BucketConfig, NodeKind, TenantShardTimelineId};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use histogram::Histogram;
|
||||
use pageserver::tenant::IndexPart;
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use tracing::Instrument;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct MetadataSummary {
|
||||
@@ -103,7 +108,7 @@ impl MetadataSummary {
|
||||
total_size += meta.file_size;
|
||||
self.layer_size_bytes.sample(meta.file_size)?;
|
||||
}
|
||||
self.timeline_size_bytes.sample(total_size)?;
|
||||
self.timeline_size_bytes.sample(total_size / 1024)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -156,7 +161,7 @@ With errors: {1}
|
||||
With warnings: {2}
|
||||
With garbage: {3}
|
||||
Index versions: {version_summary}
|
||||
Timeline size bytes: {4}
|
||||
Timeline size KiB: {4}
|
||||
Layer size bytes: {5}
|
||||
Timeline layer count: {6}
|
||||
",
|
||||
@@ -179,43 +184,204 @@ Timeline layer count: {6}
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
|
||||
pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result<MetadataSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
|
||||
|
||||
let tenants = stream_tenants(&s3_client, &target);
|
||||
|
||||
// How many tenants to process in parallel. We need to be mindful of pageservers
|
||||
// accessing the same per tenant prefixes, so use a lower setting than pageservers.
|
||||
const CONCURRENCY: usize = 32;
|
||||
|
||||
// Generate a stream of TenantTimelineId
|
||||
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
// Generate a stream of S3TimelineBlobData
|
||||
async fn report_on_timeline(
|
||||
s3_client: &Client,
|
||||
target: &RootTarget,
|
||||
ttid: TenantShardTimelineId,
|
||||
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
|
||||
let data = list_timeline_blobs(s3_client, ttid, target).await?;
|
||||
Ok((ttid, data))
|
||||
}
|
||||
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
|
||||
let timelines = timelines.try_buffer_unordered(CONCURRENCY);
|
||||
|
||||
let mut summary = MetadataSummary::new();
|
||||
pin_mut!(timelines);
|
||||
while let Some(i) = timelines.next().await {
|
||||
let (ttid, data) = i?;
|
||||
summary.update_data(&data);
|
||||
|
||||
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
|
||||
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
|
||||
Ok(summary)
|
||||
#[derive(Debug)]
|
||||
enum Either<A, B> {
|
||||
Left(A),
|
||||
Right(B),
|
||||
}
|
||||
|
||||
/// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics.
|
||||
pub async fn scan_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
|
||||
let target = Arc::new(target);
|
||||
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&s3_client, &target))
|
||||
} else {
|
||||
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
|
||||
};
|
||||
|
||||
let tenants = tenants.fuse();
|
||||
|
||||
let mut tenants = std::pin::pin!(tenants);
|
||||
|
||||
let mut js = tokio::task::JoinSet::new();
|
||||
let mut consumed_all = false;
|
||||
|
||||
let summary = MetadataSummary::new();
|
||||
|
||||
// have timeline and timeline blob listings fight over the same semaphore
|
||||
let timeline_listings = Arc::new(tokio::sync::Semaphore::new(50));
|
||||
let blob_listings = timeline_listings.clone();
|
||||
|
||||
let spawned_tenants = AtomicUsize::new(0);
|
||||
let spawned_timelines = Arc::new(AtomicUsize::new(0));
|
||||
let completed_tenants = AtomicUsize::new(0);
|
||||
let completed_timelines = AtomicUsize::new(0);
|
||||
let last_tenant_id = std::cell::RefCell::new(None);
|
||||
|
||||
let s3_client = s3_client.clone();
|
||||
let target = target.clone();
|
||||
|
||||
let summary = std::sync::Mutex::new(summary);
|
||||
|
||||
let scan_tenants = async {
|
||||
let timeline_listings = timeline_listings;
|
||||
let blob_listings = blob_listings;
|
||||
|
||||
// used to control whether to receive more tenants
|
||||
let mut more_tenants = true;
|
||||
|
||||
loop {
|
||||
let next_start = tokio::select! {
|
||||
next_tenant = tenants.next(), if !consumed_all && more_tenants => {
|
||||
match next_tenant {
|
||||
Some(Ok(tenant_id)) => Either::Left(tenant_id),
|
||||
Some(Err(e)) => {
|
||||
consumed_all = true;
|
||||
tracing::error!("tenant streaming failed with: {e:?}");
|
||||
continue;
|
||||
}
|
||||
None => {
|
||||
consumed_all = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
next = js.join_next(), if !js.is_empty() => {
|
||||
more_tenants = js.len() < 10;
|
||||
|
||||
match next.unwrap() {
|
||||
Ok(Either::Left((tenant_id, timelines))) => {
|
||||
completed_tenants.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Either::Right((tenant_id, timelines))
|
||||
}
|
||||
Ok(Either::Right(Some((ttid, data)))) => {
|
||||
completed_timelines.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let ttid: TenantShardTimelineId = ttid;
|
||||
{
|
||||
let _e = tracing::info_span!("analysis", tenant_shard_id=%ttid.tenant_shard_id, timeline_id=%ttid.timeline_id).entered();
|
||||
let summary = &mut summary.lock().unwrap();
|
||||
summary.update_data(&data);
|
||||
let analysis = branch_cleanup_and_check_errors(&ttid, &target, None, None, Some(data));
|
||||
summary.update_analysis(&ttid, &analysis);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Ok(Either::Right(None)) => {
|
||||
completed_timelines.fetch_add(1, Ordering::Relaxed);
|
||||
continue;
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
continue;
|
||||
},
|
||||
Err(je) => {
|
||||
tracing::error!("unknown join error: {je:?}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
else => break,
|
||||
};
|
||||
|
||||
let s3_client = s3_client.clone();
|
||||
let target = target.clone();
|
||||
let timeline_listings = timeline_listings.clone();
|
||||
let blob_listings = blob_listings.clone();
|
||||
|
||||
match next_start {
|
||||
Either::Left(tenant_shard_id) => {
|
||||
let span = tracing::info_span!("get_timelines", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
|
||||
js.spawn(
|
||||
async move {
|
||||
let _permit = timeline_listings.acquire().await;
|
||||
let timelines = crate::metadata_stream::get_tenant_timelines(
|
||||
&s3_client,
|
||||
&target,
|
||||
tenant_shard_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
Either::Left((tenant_shard_id, timelines))
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
|
||||
more_tenants = js.len() < 1000;
|
||||
|
||||
spawned_tenants.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
*last_tenant_id.borrow_mut() = Some(tenant_shard_id);
|
||||
}
|
||||
Either::Right((tenant_shard_id, timelines)) => {
|
||||
for timeline_id in timelines {
|
||||
let timeline_id = match timeline_id {
|
||||
Ok(timeline_id) => timeline_id,
|
||||
Err(e) => {
|
||||
tracing::error!("failed to fetch a timeline: {e:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let s3_client = s3_client.clone();
|
||||
let target = target.clone();
|
||||
let blob_listings = blob_listings.clone();
|
||||
let span = tracing::info_span!("list_timelines_blobs", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
|
||||
js.spawn(
|
||||
async move {
|
||||
let _permit = blob_listings.acquire().await;
|
||||
let ttid = TenantShardTimelineId::new(tenant_shard_id, timeline_id);
|
||||
match list_timeline_blobs(&s3_client, ttid, &target).await {
|
||||
Ok(data) => Either::Right(Some((ttid, data))),
|
||||
Err(e) => {
|
||||
tracing::error!("listing failed {e:?}");
|
||||
Either::Right(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
|
||||
spawned_timelines.fetch_add(1, Ordering::Relaxed);
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
{
|
||||
let mut scan_tenants = std::pin::pin!(scan_tenants);
|
||||
|
||||
loop {
|
||||
let res =
|
||||
tokio::time::timeout(std::time::Duration::from_secs(1), &mut scan_tenants).await;
|
||||
|
||||
let spawned_tenants = spawned_tenants.load(Ordering::Relaxed);
|
||||
let completed_tenants = completed_tenants.load(Ordering::Relaxed);
|
||||
let spawned_timelines = spawned_timelines.load(Ordering::Relaxed);
|
||||
let completed_timelines = completed_timelines.load(Ordering::Relaxed);
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!("progress tenants: {completed_tenants:>6} / {spawned_tenants:<6}, timelines: {completed_timelines:>6} / {spawned_timelines:<6} after {:?}", started_at.elapsed());
|
||||
break;
|
||||
}
|
||||
Err(_timeout) => {
|
||||
tracing::info!("progress tenants: {completed_tenants:>6} / {spawned_tenants:<6}, timelines: {completed_timelines:>6} / {spawned_timelines:<6}, last tenant: {:?}", &*last_tenant_id.borrow());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(summary.into_inner().unwrap())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user