storage_scrubber: migrate scan_safekeeper_metadata to remote_storage (#8595)

Migrates the safekeeper-specific parts of `ScanMetadata` to
GenericRemoteStorage, making it Azure-ready.
 
Part of https://github.com/neondatabase/neon/issues/7547
This commit is contained in:
Arpad Müller
2024-08-06 12:51:39 +02:00
committed by GitHub
parent dc7eb5ae5a
commit a31c95cb40
2 changed files with 42 additions and 10 deletions

View File

@@ -4,7 +4,7 @@ use anyhow::{anyhow, Context};
use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode};
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use tokio_stream::Stream;
use crate::{
@@ -276,3 +276,33 @@ pub(crate) fn stream_listing<'a>(
}
}
}
pub(crate) fn stream_listing_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<(RemotePath, Option<ListingObject>)>> + 'a {
let listing_mode = if target.delimiter.is_empty() {
ListingMode::NoDelimiter
} else {
ListingMode::WithDelimiter
};
try_stream! {
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
listing_mode,
target,
));
while let Some(list) = objects_stream.next().await {
let list = list?;
if target.delimiter.is_empty() {
for key in list.keys {
yield (key.key.clone(), Some(key));
}
} else {
for key in list.prefixes {
yield (key, None);
}
}
}
}
}

View File

@@ -1,10 +1,10 @@
use std::{collections::HashSet, str::FromStr, sync::Arc};
use aws_sdk_s3::Client;
use futures::stream::{StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::{XLogFileName, PG_TLI};
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use tokio_postgres::types::PgLsn;
use tracing::{error, info, trace};
@@ -14,8 +14,9 @@ use utils::{
};
use crate::{
cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
cloud_admin_api::CloudAdminApiClient, init_remote_generic,
metadata_stream::stream_listing_generic, BucketConfig, ConsoleConfig, NodeKind, RootTarget,
TenantShardTimelineId,
};
/// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
@@ -106,7 +107,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Safekeeper).await?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
@@ -119,7 +120,7 @@ pub async fn scan_safekeeper_metadata(
let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
check_timeline(
&s3_client,
&remote_client,
&target,
&cloud_admin_api_client,
ttid,
@@ -156,7 +157,7 @@ struct TimelineCheckResult {
/// errors are logged to stderr; returns Ok(true) if timeline is consistent,
/// Ok(false) if not, Err if failed to check.
async fn check_timeline(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
root: &RootTarget,
api_client: &CloudAdminApiClient,
ttid: TenantTimelineId,
@@ -187,12 +188,13 @@ async fn check_timeline(
// we need files, so unset it.
timeline_dir_target.delimiter = String::new();
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let obj = obj?;
let key = obj.key();
let (key, _obj) = obj?;
let seg_name = key
.get_path()
.as_str()
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
.expect("failed to extract segment name");
expected_segfiles.remove(seg_name);