Migrate physical GC and scan_metadata to remote_storage (#8673)

Migrates most of the remaining parts of the scrubber to remote_storage:

* `pageserver_physical_gc`
* `scan_metadata` for pageservers (safekeepers were done in #8595)
* `download()` in `tenant_snapshot`. The main `tenant_snapshot` is not
migrated as it uses version history to be able to work in the face of
ongoing changes.
 
Part of #7547
This commit is contained in:
Arpad Müller
2024-08-19 16:39:44 +02:00
committed by GitHub
parent eb7241c798
commit 3b8ca477ab
14 changed files with 366 additions and 341 deletions

View File

@@ -383,6 +383,48 @@ impl RemoteStorage for AzureBlobStorage {
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let blob_client = self.client.blob_client(self.relative_path_to_name(key));
let properties_future = blob_client.get_properties().into_future();
let properties_future = tokio::time::timeout(self.timeout, properties_future);
let res = tokio::select! {
res = properties_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
if let Ok(inner) = &res {
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, inner, started_at);
}
let data = match res {
Ok(Ok(data)) => Ok(data),
Ok(Err(sdk)) => Err(to_download_error(sdk)),
Err(_timeout) => Err(DownloadError::Timeout),
}?;
let properties = data.blob.properties;
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::from(properties.last_modified),
size: properties.content_length,
})
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -150,7 +150,7 @@ pub enum ListingMode {
NoDelimiter,
}
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct ListingObject {
pub key: RemotePath,
pub last_modified: SystemTime,
@@ -215,6 +215,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}
/// Obtain metadata information about an object.
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
@@ -363,6 +370,20 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
// See [`RemoteStorage::head_object`].
pub async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
match self {
Self::LocalFs(s) => s.head_object(key, cancel).await,
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
}
}
/// See [`RemoteStorage::upload`]
pub async fn upload(
&self,
@@ -598,6 +619,7 @@ impl ConcurrencyLimiter {
RequestKind::Delete => &self.write,
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
}
}

View File

@@ -445,6 +445,20 @@ impl RemoteStorage for LocalFs {
}
}
async fn head_object(
&self,
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
key: key.clone(),
last_modified: metadata.modified()?,
size: metadata.len(),
})
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,

View File

@@ -13,6 +13,7 @@ pub(crate) enum RequestKind {
List = 3,
Copy = 4,
TimeTravel = 5,
Head = 6,
}
use scopeguard::ScopeGuard;
@@ -27,6 +28,7 @@ impl RequestKind {
List => "list_objects",
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
}
}
const fn as_index(&self) -> usize {
@@ -34,7 +36,8 @@ impl RequestKind {
}
}
pub(crate) struct RequestTyped<C>([C; 6]);
const REQUEST_KIND_COUNT: usize = 7;
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
impl<C> RequestTyped<C> {
pub(crate) fn get(&self, kind: RequestKind) -> &C {
@@ -43,8 +46,8 @@ impl<C> RequestTyped<C> {
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
let arr = std::array::from_fn::<C, 6, _>(|index| {
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)

View File

@@ -23,7 +23,7 @@ use aws_config::{
use aws_sdk_s3::{
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
operation::get_object::GetObjectError,
operation::{get_object::GetObjectError, head_object::HeadObjectError},
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
Client,
};
@@ -604,6 +604,78 @@ impl RemoteStorage for S3Bucket {
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let head_future = self
.client
.head_object()
.bucket(self.bucket_name())
.key(self.relative_path_to_s3_object(key))
.send();
let head_future = tokio::time::timeout(self.timeout, head_future);
let res = tokio::select! {
res = head_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
let res = res.map_err(|_e| DownloadError::Timeout)?;
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
let data = match res {
Ok(object_output) => object_output,
Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
// e.g. when probing for timeline indices.
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Ok,
started_at,
);
return Err(DownloadError::NotFound);
}
Err(e) => {
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
return Err(DownloadError::Other(
anyhow::Error::new(e).context("s3 head object"),
));
}
};
let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
return Err(DownloadError::Other(anyhow!(
"head_object doesn't contain last_modified or content_length"
)))?;
};
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::try_from(last_modified).map_err(|e| {
DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
})?,
size: size as u64,
})
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -30,6 +30,7 @@ pub struct UnreliableWrapper {
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
ListPrefixes(Option<RemotePath>),
HeadObject(RemotePath),
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
@@ -137,6 +138,16 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<crate::ListingObject, DownloadError> {
self.attempt(RemoteOp::HeadObject(key.clone()))
.map_err(DownloadError::Other)?;
self.inner.head_object(key, cancel).await
}
async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,

View File

@@ -1,22 +1,22 @@
use std::collections::{HashMap, HashSet};
use anyhow::Context;
use aws_sdk_s3::Client;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use utils::generation::Generation;
use utils::id::TimelineId;
use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
use crate::metadata_stream::stream_listing_generic;
use crate::{download_object_with_retries_generic, RootTarget, TenantShardTimelineId};
use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use remote_storage::RemotePath;
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
pub(crate) struct TimelineAnalysis {
/// Anomalies detected
@@ -48,13 +48,12 @@ impl TimelineAnalysis {
}
pub(crate) async fn branch_cleanup_and_check_errors(
s3_client: &Client,
target: &RootTarget,
remote_client: &GenericRemoteStorage,
id: &TenantShardTimelineId,
tenant_objects: &mut TenantObjectListing,
s3_active_branch: Option<&BranchData>,
console_branch: Option<BranchData>,
s3_data: Option<S3TimelineBlobData>,
s3_data: Option<RemoteTimelineBlobData>,
) -> TimelineAnalysis {
let mut result = TimelineAnalysis::new();
@@ -78,7 +77,9 @@ pub(crate) async fn branch_cleanup_and_check_errors(
match s3_data {
Some(s3_data) => {
result.garbage_keys.extend(s3_data.unknown_keys);
result
.garbage_keys
.extend(s3_data.unknown_keys.into_iter().map(|k| k.key.to_string()));
match s3_data.blob_data {
BlobDataParseResult::Parsed {
@@ -143,11 +144,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
// HEAD request used here to address a race condition when an index was uploaded concurrently
// with our scan. We check if the object is uploaded to S3 after taking the listing snapshot.
let response = s3_client
.head_object()
.bucket(target.bucket_name())
.key(path.get_path().as_str())
.send()
let response = remote_client
.head_object(&path, &CancellationToken::new())
.await;
if response.is_err() {
@@ -284,14 +282,14 @@ impl TenantObjectListing {
}
#[derive(Debug)]
pub(crate) struct S3TimelineBlobData {
pub(crate) struct RemoteTimelineBlobData {
pub(crate) blob_data: BlobDataParseResult,
// Index objects that were not used when loading `blob_data`, e.g. those from old generations
pub(crate) unused_index_keys: Vec<String>,
pub(crate) unused_index_keys: Vec<ListingObject>,
// Objects whose keys were not recognized at all, i.e. not layer files, not indices
pub(crate) unknown_keys: Vec<String>,
pub(crate) unknown_keys: Vec<ListingObject>,
}
#[derive(Debug)]
@@ -322,11 +320,11 @@ pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generati
}
}
pub(crate) async fn list_timeline_blobs(
s3_client: &Client,
pub(crate) async fn list_timeline_blobs_generic(
remote_client: &GenericRemoteStorage,
id: TenantShardTimelineId,
s3_root: &RootTarget,
) -> anyhow::Result<S3TimelineBlobData> {
) -> anyhow::Result<RemoteTimelineBlobData> {
let mut s3_layers = HashSet::new();
let mut errors = Vec::new();
@@ -335,19 +333,25 @@ pub(crate) async fn list_timeline_blobs(
let mut timeline_dir_target = s3_root.timeline_root(&id);
timeline_dir_target.delimiter = String::new();
let mut index_part_keys: Vec<String> = Vec::new();
let mut index_part_keys: Vec<ListingObject> = Vec::new();
let mut initdb_archive: bool = false;
let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let obj = obj?;
let key = obj.key();
let prefix_str = &timeline_dir_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let (key, Some(obj)) = obj? else {
panic!("ListingObject not specified");
};
let blob_name = key.get_path().as_str().strip_prefix(prefix_str);
match blob_name {
Some(name) if name.starts_with("index_part.json") => {
tracing::debug!("Index key {key}");
index_part_keys.push(key.to_owned())
index_part_keys.push(obj)
}
Some("initdb.tar.zst") => {
tracing::debug!("initdb archive {key}");
@@ -358,7 +362,7 @@ pub(crate) async fn list_timeline_blobs(
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::debug!("Parsed layer key: {} {:?}", new_layer, gen);
tracing::debug!("Parsed layer key: {new_layer} {gen:?}");
s3_layers.insert((new_layer, gen));
}
Err(e) => {
@@ -366,13 +370,13 @@ pub(crate) async fn list_timeline_blobs(
errors.push(
format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
);
unknown_keys.push(key.to_string());
unknown_keys.push(obj);
}
},
None => {
tracing::warn!("Unknown key {}", key);
tracing::warn!("Unknown key {key}");
errors.push(format!("S3 list response got an object with odd key {key}"));
unknown_keys.push(key.to_string());
unknown_keys.push(obj);
}
}
}
@@ -381,7 +385,7 @@ pub(crate) async fn list_timeline_blobs(
tracing::debug!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {
return Ok(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys,
unknown_keys: Vec::new(),
@@ -395,13 +399,13 @@ pub(crate) async fn list_timeline_blobs(
// Stripping the index key to the last part, because RemotePath doesn't
// like absolute paths, and depending on prefix_in_bucket it's possible
// for the keys we read back to start with a slash.
let basename = key.rsplit_once('/').unwrap().1;
let basename = key.key.get_path().as_str().rsplit_once('/').unwrap().1;
parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (key, g))
})
.max_by_key(|i| i.1)
.map(|(k, g)| (k.clone(), g))
{
Some((key, gen)) => (Some(key), gen),
Some((key, gen)) => (Some::<ListingObject>(key.to_owned()), gen),
None => {
// Legacy/missing case: one or zero index parts, which did not have a generation
(index_part_keys.pop(), Generation::none())
@@ -416,17 +420,14 @@ pub(crate) async fn list_timeline_blobs(
}
if let Some(index_part_object_key) = index_part_object.as_ref() {
let index_part_bytes = download_object_with_retries(
s3_client,
&timeline_dir_target.bucket_name,
index_part_object_key,
)
.await
.context("index_part.json download")?;
let index_part_bytes =
download_object_with_retries_generic(remote_client, &index_part_object_key.key)
.await
.context("index_part.json download")?;
match serde_json::from_slice(&index_part_bytes) {
Ok(index_part) => {
return Ok(S3TimelineBlobData {
return Ok(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Parsed {
index_part: Box::new(index_part),
index_part_generation,
@@ -448,7 +449,7 @@ pub(crate) async fn list_timeline_blobs(
);
}
Ok(S3TimelineBlobData {
Ok(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,

View File

@@ -452,23 +452,26 @@ fn stream_objects_with_retries<'a>(
let mut list_stream =
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
while let Some(res) = list_stream.next().await {
if let Err(err) = res {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
match res {
Err(err) => {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
}
}
Ok(res) => {
trial = 0;
yield Ok(res);
}
} else {
trial = 0;
yield res.map_err(anyhow::Error::from);
}
}
}
@@ -513,41 +516,35 @@ async fn list_objects_with_retries_generic(
panic!("MAX_RETRIES is not allowed to be 0");
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,
key: &str,
async fn download_object_with_retries_generic(
remote_client: &GenericRemoteStorage,
key: &RemotePath,
) -> anyhow::Result<Vec<u8>> {
for _ in 0..MAX_RETRIES {
let mut body_buf = Vec::new();
let response_stream = match s3_client
.get_object()
.bucket(bucket_name)
.key(key)
.send()
.await
{
let cancel = CancellationToken::new();
for trial in 0..MAX_RETRIES {
let mut buf = Vec::new();
let download = match remote_client.download(key, &cancel).await {
Ok(response) => response,
Err(e) => {
error!("Failed to download object for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
continue;
}
};
match response_stream
.body
.into_async_read()
.read_to_end(&mut body_buf)
match tokio_util::io::StreamReader::new(download.download_stream)
.read_to_end(&mut buf)
.await
{
Ok(bytes_read) => {
tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
return Ok(body_buf);
return Ok(buf);
}
Err(e) => {
error!("Failed to stream object body for key {key}: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
}
}
}

View File

@@ -2,14 +2,14 @@ use std::str::FromStr;
use anyhow::{anyhow, Context};
use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use aws_sdk_s3::Client;
use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode, ListingObject, RemotePath};
use tokio_stream::Stream;
use crate::{
list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
TenantShardTimelineId,
list_objects_with_retries, list_objects_with_retries_generic, stream_objects_with_retries,
RootTarget, S3Target, TenantShardTimelineId,
};
use pageserver_api::shard::TenantShardId;
use utils::id::{TenantId, TimelineId};
@@ -75,53 +75,38 @@ pub fn stream_tenants<'a>(
}
pub async fn stream_tenant_shards<'a>(
s3_client: &'a Client,
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant_id: TenantId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardId, anyhow::Error>> + 'a> {
let mut tenant_shard_ids: Vec<Result<TenantShardId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let shards_target = target.tenant_shards_prefix(&tenant_id);
loop {
tracing::info!("Listing in {}", shards_target.prefix_in_bucket);
let fetch_response =
list_objects_with_retries(s3_client, &shards_target, continuation_token.clone()).await;
let fetch_response = match fetch_response {
Err(e) => {
tenant_shard_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let strip_prefix = target.tenants_root().prefix_in_bucket;
let prefix_str = &strip_prefix.strip_prefix("/").unwrap_or(&strip_prefix);
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&target.tenants_root().prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
tracing::info!("Listing shards in {}", shards_target.prefix_in_bucket);
let listing = list_objects_with_retries_generic(
remote_client,
ListingMode::WithDelimiter,
&shards_target,
)
.await?;
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
});
let tenant_shard_ids = listing
.prefixes
.iter()
.map(|prefix| prefix.get_path().as_str())
.filter_map(|prefix| -> Option<&str> { prefix.strip_prefix(prefix_str) })
.map(|entry_id_str| {
let first_part = entry_id_str.split('/').next().unwrap();
for i in new_entry_ids {
tenant_shard_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
first_part
.parse::<TenantShardId>()
.with_context(|| format!("Incorrect entry id str: {first_part}"))
})
.collect::<Vec<_>>();
tracing::debug!("Yielding {} shards for {tenant_id}", tenant_shard_ids.len());
Ok(stream! {
for i in tenant_shard_ids {
let id = i?;
@@ -130,65 +115,6 @@ pub async fn stream_tenant_shards<'a>(
})
}
/// Given a TenantShardId, output a stream of the timelines within that tenant, discovered
/// using ListObjectsv2. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
pub async fn stream_tenant_timelines<'a>(
s3_client: &'a Client,
target: &'a RootTarget,
tenant: TenantShardId,
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let mut continuation_token = None;
let timelines_target = target.timelines_root(&tenant);
loop {
tracing::debug!("Listing in {}", tenant);
let fetch_response =
list_objects_with_retries(s3_client, &timelines_target, continuation_token.clone())
.await;
let fetch_response = match fetch_response {
Err(e) => {
timeline_ids.push(Err(e));
break;
}
Ok(r) => r,
};
let new_entry_ids = fetch_response
.common_prefixes()
.iter()
.filter_map(|prefix| prefix.prefix())
.filter_map(|prefix| -> Option<&str> {
prefix
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
})
.map(|entry_id_str| {
entry_id_str
.parse::<TimelineId>()
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
});
for i in new_entry_ids {
timeline_ids.push(i);
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
tracing::debug!("Yielding for {}", tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
yield Ok(TenantShardTimelineId::new(tenant, id));
}
})
}
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
/// using a listing. The listing is done before the stream is built, so that this
/// function can be used to generate concurrency on a stream using buffer_unordered.
@@ -200,6 +126,11 @@ pub async fn stream_tenant_timelines_generic<'a>(
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
let timelines_target = target.timelines_root(&tenant);
let prefix_str = &timelines_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timelines_target.prefix_in_bucket);
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
remote_client,
ListingMode::WithDelimiter,
@@ -220,11 +151,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
.prefixes
.iter()
.filter_map(|prefix| -> Option<&str> {
prefix
.get_path()
.as_str()
.strip_prefix(&timelines_target.prefix_in_bucket)?
.strip_suffix('/')
prefix.get_path().as_str().strip_prefix(prefix_str)
})
.map(|entry_id_str| {
entry_id_str
@@ -237,7 +164,7 @@ pub async fn stream_tenant_timelines_generic<'a>(
}
}
tracing::debug!("Yielding for {}", tenant);
tracing::debug!("Yielding {} timelines for {}", timeline_ids.len(), tenant);
Ok(stream! {
for i in timeline_ids {
let id = i?;
@@ -246,37 +173,6 @@ pub async fn stream_tenant_timelines_generic<'a>(
})
}
pub(crate) fn stream_listing<'a>(
s3_client: &'a Client,
target: &'a S3Target,
) -> impl Stream<Item = anyhow::Result<ObjectIdentifier>> + 'a {
try_stream! {
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
if target.delimiter.is_empty() {
for object_key in fetch_response.contents().iter().filter_map(|object| object.key())
{
let object_id = ObjectIdentifier::builder().key(object_key).build()?;
yield object_id;
}
} else {
for prefix in fetch_response.common_prefixes().iter().filter_map(|p| p.prefix()) {
let object_id = ObjectIdentifier::builder().key(prefix).build()?;
yield object_id;
}
}
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
}
}
pub(crate) fn stream_listing_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a S3Target,

View File

@@ -1,11 +1,10 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic};
use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
@@ -13,10 +12,11 @@ use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::controller_api::TenantDescribeResponse;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use remote_storage::RemotePath;
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
use reqwest::Method;
use serde::Serialize;
use storage_controller_client::control_api;
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use utils::generation::Generation;
use utils::id::{TenantId, TenantTimelineId};
@@ -240,38 +240,13 @@ impl TenantRefAccumulator {
}
}
async fn is_old_enough(
s3_client: &Client,
bucket_config: &BucketConfig,
min_age: &Duration,
key: &str,
summary: &mut GcSummary,
) -> bool {
fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
// Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
// it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
let age: Duration = match s3_client
.head_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
.await
{
Ok(response) => match response.last_modified {
None => {
tracing::warn!("Missing last_modified");
summary.remote_storage_errors += 1;
return false;
}
Some(last_modified) => match SystemTime::try_from(last_modified).map(|t| t.elapsed()) {
Ok(Ok(e)) => e,
Err(_) | Ok(Err(_)) => {
tracing::warn!("Bad last_modified time: {last_modified:?}");
return false;
}
},
},
Err(e) => {
tracing::warn!("Failed to HEAD {key}: {e}");
let age = match key.last_modified.elapsed() {
Ok(e) => e,
Err(_) => {
tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
summary.remote_storage_errors += 1;
return false;
}
@@ -289,17 +264,30 @@ async fn is_old_enough(
old_enough
}
/// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
async fn check_is_old_enough(
remote_client: &GenericRemoteStorage,
key: &RemotePath,
min_age: &Duration,
summary: &mut GcSummary,
) -> Option<bool> {
let listing_object = remote_client
.head_object(key, &CancellationToken::new())
.await
.ok()?;
Some(is_old_enough(min_age, &listing_object, summary))
}
async fn maybe_delete_index(
s3_client: &Client,
bucket_config: &BucketConfig,
remote_client: &GenericRemoteStorage,
min_age: &Duration,
latest_gen: Generation,
key: &str,
obj: &ListingObject,
mode: GcMode,
summary: &mut GcSummary,
) {
// Validation: we will only delete things that parse cleanly
let basename = key.rsplit_once('/').unwrap().1;
let basename = obj.key.get_path().file_name().unwrap();
let candidate_generation =
match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
Some(g) => g,
@@ -328,7 +316,7 @@ async fn maybe_delete_index(
return;
}
if !is_old_enough(s3_client, bucket_config, min_age, key, summary).await {
if !is_old_enough(min_age, obj, summary) {
return;
}
@@ -338,11 +326,8 @@ async fn maybe_delete_index(
}
// All validations passed: erase the object
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(key)
.send()
match remote_client
.delete(&obj.key, &CancellationToken::new())
.await
{
Ok(_) => {
@@ -358,8 +343,7 @@ async fn maybe_delete_index(
#[allow(clippy::too_many_arguments)]
async fn gc_ancestor(
s3_client: &Client,
bucket_config: &BucketConfig,
remote_client: &GenericRemoteStorage,
root_target: &RootTarget,
min_age: &Duration,
ancestor: TenantShardId,
@@ -368,7 +352,7 @@ async fn gc_ancestor(
summary: &mut GcSummary,
) -> anyhow::Result<()> {
// Scan timelines in the ancestor
let timelines = stream_tenant_timelines(s3_client, root_target, ancestor).await?;
let timelines = stream_tenant_timelines_generic(remote_client, root_target, ancestor).await?;
let mut timelines = std::pin::pin!(timelines);
// Build a list of keys to retain
@@ -376,7 +360,7 @@ async fn gc_ancestor(
while let Some(ttid) = timelines.next().await {
let ttid = ttid?;
let data = list_timeline_blobs(s3_client, ttid, root_target).await?;
let data = list_timeline_blobs_generic(remote_client, ttid, root_target).await?;
let s3_layers = match data.blob_data {
BlobDataParseResult::Parsed {
@@ -427,7 +411,8 @@ async fn gc_ancestor(
// We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
// to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
if !is_old_enough(s3_client, bucket_config, min_age, &key, summary).await {
let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
continue;
}
@@ -437,13 +422,7 @@ async fn gc_ancestor(
}
// All validations passed: erase the object
match s3_client
.delete_object()
.bucket(&bucket_config.bucket)
.key(&key)
.send()
.await
{
match remote_client.delete(&path, &CancellationToken::new()).await {
Ok(_) => {
tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
summary.ancestor_layers_deleted += 1;
@@ -477,10 +456,11 @@ pub async fn pageserver_physical_gc(
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (remote_client, target) =
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
futures::future::Either::Left(stream_tenants_generic(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
};
@@ -493,14 +473,13 @@ pub async fn pageserver_physical_gc(
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn gc_timeline(
s3_client: &Client,
bucket_config: &BucketConfig,
remote_client: &GenericRemoteStorage,
min_age: &Duration,
target: &RootTarget,
mode: GcMode,
@@ -508,7 +487,7 @@ pub async fn pageserver_physical_gc(
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(s3_client, ttid, target).await?;
let data = list_timeline_blobs_generic(remote_client, ttid, target).await?;
let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
@@ -533,17 +512,9 @@ pub async fn pageserver_physical_gc(
accumulator.lock().unwrap().update(ttid, index_part);
for key in candidates {
maybe_delete_index(
s3_client,
bucket_config,
min_age,
latest_gen,
&key,
mode,
&mut summary,
)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
.await;
maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
.await;
}
Ok(summary)
@@ -554,15 +525,7 @@ pub async fn pageserver_physical_gc(
// Drain futures for per-shard GC, populating accumulator as a side effect
{
let timelines = timelines.map_ok(|ttid| {
gc_timeline(
&s3_client,
bucket_config,
&min_age,
&target,
mode,
ttid,
&accumulator,
)
gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
});
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
@@ -586,8 +549,7 @@ pub async fn pageserver_physical_gc(
for ancestor_shard in ancestor_shards {
gc_ancestor(
&s3_client,
bucket_config,
&remote_client,
&target,
&min_age,
ancestor_shard,

View File

@@ -1,16 +1,16 @@
use std::collections::{HashMap, HashSet};
use crate::checks::{
branch_cleanup_and_check_errors, list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData,
TenantObjectListing, TimelineAnalysis,
branch_cleanup_and_check_errors, list_timeline_blobs_generic, BlobDataParseResult,
RemoteTimelineBlobData, TenantObjectListing, TimelineAnalysis,
};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use aws_sdk_s3::Client;
use crate::metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic};
use crate::{init_remote_generic, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver_api::controller_api::MetadataHealthUpdateRequest;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use serde::Serialize;
use utils::id::TenantId;
use utils::shard::ShardCount;
@@ -36,7 +36,7 @@ impl MetadataSummary {
Self::default()
}
fn update_data(&mut self, data: &S3TimelineBlobData) {
fn update_data(&mut self, data: &RemoteTimelineBlobData) {
self.timeline_shard_count += 1;
if let BlobDataParseResult::Parsed {
index_part,
@@ -120,10 +120,10 @@ pub async fn scan_pageserver_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let (remote_client, target) = init_remote_generic(bucket_config, NodeKind::Pageserver).await?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))
futures::future::Either::Left(stream_tenants_generic(&remote_client, &target))
} else {
futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
};
@@ -133,20 +133,20 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
let timelines = tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn report_on_timeline(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs_generic(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| report_on_timeline(&s3_client, &target, ttid));
let timelines = timelines.map_ok(|ttid| report_on_timeline(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
// We must gather all the TenantShardTimelineId->S3TimelineBlobData for each tenant, because different
@@ -157,12 +157,11 @@ pub async fn scan_pageserver_metadata(
let mut tenant_timeline_results = Vec::new();
async fn analyze_tenant(
s3_client: &Client,
target: &RootTarget,
remote_client: &GenericRemoteStorage,
tenant_id: TenantId,
summary: &mut MetadataSummary,
mut tenant_objects: TenantObjectListing,
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
timelines: Vec<(TenantShardTimelineId, RemoteTimelineBlobData)>,
highest_shard_count: ShardCount,
) {
summary.tenant_count += 1;
@@ -191,8 +190,7 @@ pub async fn scan_pageserver_metadata(
// Apply checks to this timeline shard's metadata, and in the process update `tenant_objects`
// reference counts for layers across the tenant.
let analysis = branch_cleanup_and_check_errors(
s3_client,
target,
remote_client,
&ttid,
&mut tenant_objects,
None,
@@ -273,8 +271,7 @@ pub async fn scan_pageserver_metadata(
let tenant_objects = std::mem::take(&mut tenant_objects);
let timelines = std::mem::take(&mut tenant_timeline_results);
analyze_tenant(
&s3_client,
&target,
&remote_client,
prev_tenant_id,
&mut summary,
tenant_objects,
@@ -311,8 +308,7 @@ pub async fn scan_pageserver_metadata(
if !tenant_timeline_results.is_empty() {
analyze_tenant(
&s3_client,
&target,
&remote_client,
tenant_id.expect("Must be set if results are present"),
&mut summary,
tenant_objects,

View File

@@ -188,6 +188,11 @@ async fn check_timeline(
// we need files, so unset it.
timeline_dir_target.delimiter = String::new();
let prefix_str = &timeline_dir_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&timeline_dir_target.prefix_in_bucket);
let mut stream = std::pin::pin!(stream_listing_generic(remote_client, &timeline_dir_target));
while let Some(obj) = stream.next().await {
let (key, _obj) = obj?;
@@ -195,7 +200,7 @@ async fn check_timeline(
let seg_name = key
.get_path()
.as_str()
.strip_prefix(&timeline_dir_target.prefix_in_bucket)
.strip_prefix(prefix_str)
.expect("failed to extract segment name");
expected_segfiles.remove(seg_name);
}

View File

@@ -1,10 +1,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::checks::{list_timeline_blobs, BlobDataParseResult, S3TimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::checks::{list_timeline_blobs_generic, BlobDataParseResult, RemoteTimelineBlobData};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines_generic};
use crate::{
download_object_to_file, init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId,
download_object_to_file, init_remote, init_remote_generic, BucketConfig, NodeKind, RootTarget,
TenantShardTimelineId,
};
use anyhow::Context;
use async_stream::stream;
@@ -15,6 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use utils::generation::Generation;
use utils::id::TenantId;
@@ -215,11 +217,11 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
let (remote_client, target) =
init_remote_generic(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -237,18 +239,19 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines = stream_tenant_timelines(&s3_client, &self.s3_root, shard).await?;
let timelines = stream_tenant_timelines_generic(&remote_client, &target, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
s3_client: &Client,
remote_client: &GenericRemoteStorage,
target: &RootTarget,
ttid: TenantShardTimelineId,
) -> anyhow::Result<(TenantShardTimelineId, S3TimelineBlobData)> {
let data = list_timeline_blobs(s3_client, ttid, target).await?;
) -> anyhow::Result<(TenantShardTimelineId, RemoteTimelineBlobData)> {
let data = list_timeline_blobs_generic(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines.map_ok(|ttid| load_timeline_index(&s3_client, &target, ttid));
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {
@@ -278,7 +281,7 @@ impl SnapshotDownloader {
for (ttid, layers) in ancestor_layers.into_iter() {
tracing::info!(
"Downloading {} layers from ancvestor timeline {ttid}...",
"Downloading {} layers from ancestor timeline {ttid}...",
layers.len()
);

View File

@@ -4643,6 +4643,7 @@ class StorageScrubber:
]
args = base_args + args
log.info(f"Invoking scrubber command {args} with env: {env}")
(output_path, stdout, status_code) = subprocess_capture(
self.log_dir,
args,