mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Merge branch 'main' into problame/2024-02-walredo-work/prespawn/split-code
This commit is contained in:
@@ -28,6 +28,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::s3_bucket::RequestKind;
|
||||
use crate::TimeTravelError;
|
||||
use crate::{
|
||||
AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
|
||||
RemoteStorage, StorageMetadata,
|
||||
@@ -379,12 +380,10 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
_timestamp: SystemTime,
|
||||
_done_if_after: SystemTime,
|
||||
_cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
// TODO use Azure point in time recovery feature for this
|
||||
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
|
||||
Err(anyhow::anyhow!(
|
||||
"time travel recovery for azure blob storage is not implemented"
|
||||
))
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -219,7 +219,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
) -> Result<(), TimeTravelError>;
|
||||
}
|
||||
|
||||
pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
|
||||
@@ -269,6 +269,45 @@ impl std::fmt::Display for DownloadError {
|
||||
|
||||
impl std::error::Error for DownloadError {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TimeTravelError {
|
||||
/// Validation or other error happened due to user input.
|
||||
BadInput(anyhow::Error),
|
||||
/// The used remote storage does not have time travel recovery implemented
|
||||
Unimplemented,
|
||||
/// The number of versions/deletion markers is above our limit.
|
||||
TooManyVersions,
|
||||
/// A cancellation token aborted the process, typically during
|
||||
/// request closure or process shutdown.
|
||||
Cancelled,
|
||||
/// Other errors
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TimeTravelError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TimeTravelError::BadInput(e) => {
|
||||
write!(
|
||||
f,
|
||||
"Failed to time travel recover a prefix due to user input: {e}"
|
||||
)
|
||||
}
|
||||
TimeTravelError::Unimplemented => write!(
|
||||
f,
|
||||
"time travel recovery is not implemented for the current storage backend"
|
||||
),
|
||||
TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||
TimeTravelError::TooManyVersions => {
|
||||
write!(f, "Number of versions/delete markers above limit")
|
||||
}
|
||||
TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TimeTravelError {}
|
||||
|
||||
/// Every storage, currently supported.
|
||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||
#[derive(Clone)]
|
||||
@@ -404,7 +443,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
|
||||
@@ -18,7 +18,9 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use tracing::*;
|
||||
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
|
||||
|
||||
use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath};
|
||||
use crate::{
|
||||
Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError,
|
||||
};
|
||||
|
||||
use super::{RemoteStorage, StorageMetadata};
|
||||
|
||||
@@ -430,8 +432,8 @@ impl RemoteStorage for LocalFs {
|
||||
_timestamp: SystemTime,
|
||||
_done_if_after: SystemTime,
|
||||
_cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
) -> Result<(), TimeTravelError> {
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ use utils::backoff;
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
pub(super) mod metrics;
|
||||
@@ -639,14 +639,14 @@ impl RemoteStorage for S3Bucket {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
let kind = RequestKind::TimeTravel;
|
||||
let _guard = self.permit(kind).await;
|
||||
|
||||
let timestamp = DateTime::from(timestamp);
|
||||
let done_if_after = DateTime::from(done_if_after);
|
||||
|
||||
tracing::info!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
@@ -664,21 +664,21 @@ impl RemoteStorage for S3Bucket {
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
self.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone())
|
||||
.send()
|
||||
.await?)
|
||||
.await
|
||||
.map_err(|e| TimeTravelError::Other(e.into()))
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -699,7 +699,8 @@ impl RemoteStorage for S3Bucket {
|
||||
.map(VerOrDelete::from_delete_marker);
|
||||
itertools::process_results(versions.chain(deletes), |n_vds| {
|
||||
versions_and_deletes.extend(n_vds)
|
||||
})?;
|
||||
})
|
||||
.map_err(TimeTravelError::Other)?;
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
@@ -708,9 +709,9 @@ impl RemoteStorage for S3Bucket {
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
anyhow::bail!(
|
||||
return Err(TimeTravelError::Other(anyhow::anyhow!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
);
|
||||
)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -721,12 +722,15 @@ impl RemoteStorage for S3Bucket {
|
||||
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
|
||||
const COMPLEXITY_LIMIT: usize = 100_000;
|
||||
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
|
||||
anyhow::bail!(
|
||||
"Limit for number of versions/deletions exceeded for prefix={prefix:?}"
|
||||
);
|
||||
return Err(TimeTravelError::TooManyVersions);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Built list for time travel with {} versions and deletions",
|
||||
versions_and_deletes.len()
|
||||
);
|
||||
|
||||
// Work on the list of references instead of the objects directly,
|
||||
// otherwise we get lifetime errors in the sort_by_key call below.
|
||||
let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
|
||||
@@ -740,8 +744,8 @@ impl RemoteStorage for S3Bucket {
|
||||
version_id, key, ..
|
||||
} = &vd;
|
||||
if version_id == "null" {
|
||||
anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values");
|
||||
return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values")));
|
||||
}
|
||||
tracing::trace!(
|
||||
"Parsing version key={key} version_id={version_id} kind={:?}",
|
||||
@@ -788,22 +792,23 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
self.client
|
||||
.copy_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.key(key)
|
||||
.copy_source(&source_id)
|
||||
.send()
|
||||
.await?)
|
||||
.await
|
||||
.map_err(|e| TimeTravelError::Other(e.into()))
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
"copying object version for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
|
||||
)
|
||||
.await?;
|
||||
tracing::info!(%version_id, %key, "Copied old version in S3");
|
||||
}
|
||||
VerOrDelete {
|
||||
kind: VerOrDeleteKind::DeleteMarker,
|
||||
@@ -820,8 +825,13 @@ impl RemoteStorage for S3Bucket {
|
||||
} else {
|
||||
tracing::trace!("Deleting {key}...");
|
||||
|
||||
let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?;
|
||||
self.delete_oids(kind, &[oid]).await?;
|
||||
let oid = ObjectIdentifier::builder()
|
||||
.key(key.to_owned())
|
||||
.build()
|
||||
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
|
||||
self.delete_oids(kind, &[oid])
|
||||
.await
|
||||
.map_err(TimeTravelError::Other)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
StorageMetadata,
|
||||
StorageMetadata, TimeTravelError,
|
||||
};
|
||||
|
||||
pub struct UnreliableWrapper {
|
||||
@@ -191,8 +191,9 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?;
|
||||
) -> Result<(), TimeTravelError> {
|
||||
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
|
||||
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
|
||||
self.inner
|
||||
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
|
||||
@@ -178,6 +178,64 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/time_travel_remote_storage:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: travel_to
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
- name: done_if_after
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
put:
|
||||
description: Time travel the tenant's remote storage
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
"400":
|
||||
description: Error when no tenant id found in path or invalid timestamp
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline:
|
||||
parameters:
|
||||
|
||||
@@ -26,6 +26,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeTravelError;
|
||||
use tenant_size_model::{SizeResult, StorageModel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -47,6 +48,7 @@ use crate::tenant::mgr::{
|
||||
TenantSlotError, TenantSlotUpsertError, TenantStateError,
|
||||
};
|
||||
use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
|
||||
use crate::tenant::remote_timeline_client;
|
||||
use crate::tenant::secondary::SecondaryController;
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
@@ -77,8 +79,14 @@ use utils::{
|
||||
// For APIs that require an Active tenant, how long should we block waiting for that state?
|
||||
// This is not functionally necessary (clients will retry), but avoids generating a lot of
|
||||
// failed API calls while tenants are activating.
|
||||
#[cfg(not(feature = "testing"))]
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
// Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
|
||||
// finish attaching, if calls to remote storage are slow.
|
||||
#[cfg(feature = "testing")]
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
@@ -1424,6 +1432,79 @@ async fn list_location_config_handler(
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
|
||||
// Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
|
||||
// (from all pageservers) as it invalidates consistency assumptions.
|
||||
async fn tenant_time_travel_remote_storage_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let timestamp_raw = must_get_query_param(&request, "travel_to")?;
|
||||
let timestamp = humantime::parse_rfc3339(×tamp_raw)
|
||||
.with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}"))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
let done_if_after_raw = must_get_query_param(&request, "done_if_after")?;
|
||||
let done_if_after = humantime::parse_rfc3339(&done_if_after_raw)
|
||||
.with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}"))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
// This is just a sanity check to fend off naive wrong usages of the API:
|
||||
// the tenant needs to be detached *everywhere*
|
||||
let state = get_state(&request);
|
||||
let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id);
|
||||
if we_manage_tenant {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"Tenant {tenant_shard_id} is already attached at this pageserver"
|
||||
)));
|
||||
}
|
||||
|
||||
let Some(storage) = state.remote_storage.as_ref() else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"remote storage not configured, cannot run time travel"
|
||||
)));
|
||||
};
|
||||
|
||||
if timestamp > done_if_after {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"The done_if_after timestamp comes before the timestamp to recover to"
|
||||
)));
|
||||
}
|
||||
|
||||
tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
|
||||
|
||||
remote_timeline_client::upload::time_travel_recover_tenant(
|
||||
storage,
|
||||
&tenant_shard_id,
|
||||
timestamp,
|
||||
done_if_after,
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
TimeTravelError::BadInput(e) => {
|
||||
warn!("bad input error: {e}");
|
||||
ApiError::BadRequest(anyhow!("bad input error"))
|
||||
}
|
||||
TimeTravelError::Unimplemented => {
|
||||
ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage"))
|
||||
}
|
||||
TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")),
|
||||
TimeTravelError::TooManyVersions => {
|
||||
ApiError::InternalServerError(anyhow!("too many versions in remote storage"))
|
||||
}
|
||||
TimeTravelError::Other(e) => {
|
||||
warn!("internal error: {e}");
|
||||
ApiError::InternalServerError(anyhow!("internal error"))
|
||||
}
|
||||
})?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
|
||||
async fn handle_tenant_break(
|
||||
r: Request<Body>,
|
||||
@@ -1969,6 +2050,10 @@ pub fn make_router(
|
||||
.get("/v1/location_config", |r| {
|
||||
api_handler(r, list_location_config_handler)
|
||||
})
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
|
||||
|r| api_handler(r, tenant_time_travel_remote_storage_handler),
|
||||
)
|
||||
.get("/v1/tenant/:tenant_shard_id/timeline", |r| {
|
||||
api_handler(r, timeline_list_handler)
|
||||
})
|
||||
|
||||
@@ -898,6 +898,17 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the `TenantManager` is responsible for the tenant shard
|
||||
pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
peek_slot.is_some()
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
|
||||
@@ -1719,6 +1719,11 @@ pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_timeline_path(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
|
||||
@@ -5,9 +5,11 @@ use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::io::{ErrorKind, SeekFrom};
|
||||
use std::time::SystemTime;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
@@ -17,7 +19,7 @@ use crate::{
|
||||
remote_initdb_preserved_archive_path, remote_path, upload_cancellable,
|
||||
},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::{GenericRemoteStorage, TimeTravelError};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::LayerFileMetadata;
|
||||
@@ -157,3 +159,45 @@ pub(crate) async fn preserve_initdb_archive(
|
||||
.await
|
||||
.with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
pub(crate) async fn time_travel_recover_tenant(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), TimeTravelError> {
|
||||
let warn_after = 3;
|
||||
let max_attempts = 10;
|
||||
let mut prefixes = Vec::with_capacity(2);
|
||||
if tenant_shard_id.is_zero() {
|
||||
// Also recover the unsharded prefix for a shard of zero:
|
||||
// - if the tenant is totally unsharded, the unsharded prefix contains all the data
|
||||
// - if the tenant is sharded, we still want to recover the initdb data, but we only
|
||||
// want to do it once, so let's do it on the 0 shard
|
||||
let timelines_path_unsharded =
|
||||
super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
|
||||
prefixes.push(timelines_path_unsharded);
|
||||
}
|
||||
if !tenant_shard_id.is_unsharded() {
|
||||
// If the tenant is sharded, we need to recover the sharded prefix
|
||||
let timelines_path = super::remote_timelines_path(tenant_shard_id);
|
||||
prefixes.push(timelines_path);
|
||||
}
|
||||
for prefix in &prefixes {
|
||||
backoff::retry(
|
||||
|| async {
|
||||
storage
|
||||
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel.clone())
|
||||
.await
|
||||
},
|
||||
|e| !matches!(e, TimeTravelError::Other(_)),
|
||||
warn_after,
|
||||
max_attempts,
|
||||
"time travel recovery of tenant prefix",
|
||||
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -100,31 +100,6 @@ pub struct MetricsAuxInfo {
|
||||
pub branch_id: BranchId,
|
||||
}
|
||||
|
||||
impl MetricsAuxInfo {
|
||||
/// Definitions of labels for traffic metric.
|
||||
pub const TRAFFIC_LABELS: &'static [&'static str] = &[
|
||||
// Received (rx) / sent (tx).
|
||||
"direction",
|
||||
// ID of a project.
|
||||
"project_id",
|
||||
// ID of an endpoint within a project.
|
||||
"endpoint_id",
|
||||
// ID of a branch within a project (snapshot).
|
||||
"branch_id",
|
||||
];
|
||||
|
||||
/// Values of labels for traffic metric.
|
||||
// TODO: add more type safety (validate arity & positions).
|
||||
pub fn traffic_labels(&self, direction: &'static str) -> [&str; 4] {
|
||||
[
|
||||
direction,
|
||||
&self.project_id,
|
||||
&self.endpoint_id,
|
||||
&self.branch_id,
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -208,15 +208,6 @@ pub static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
"Number of bytes sent/received between client and backend.",
|
||||
crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
console::messages::MetricsAuxInfo,
|
||||
context::RequestMonitoring,
|
||||
metrics::{NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER},
|
||||
metrics::NUM_BYTES_PROXIED_COUNTER,
|
||||
usage_metrics::{Ids, USAGE_METRICS},
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -25,27 +25,23 @@ pub async fn proxy_pass(
|
||||
});
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["tx"]);
|
||||
let m_sent2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(
|
||||
client,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
m_sent2.inc_by(cnt as u64);
|
||||
usage.record_egress(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["rx"]);
|
||||
let m_recv2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("rx"));
|
||||
let mut compute = MeasuredStream::new(
|
||||
compute,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
m_recv.inc_by(cnt as u64);
|
||||
m_recv2.inc_by(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
import requests
|
||||
@@ -389,6 +390,20 @@ class PageserverHttpClient(requests.Session):
|
||||
)
|
||||
return res.text
|
||||
|
||||
def tenant_time_travel_remote_storage(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timestamp: datetime,
|
||||
done_if_after: datetime,
|
||||
):
|
||||
"""
|
||||
Issues a request to perform time travel operations on the remote storage
|
||||
"""
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z"
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_list(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
EmptyResponseMetadataTypeDef,
|
||||
ListObjectsV2OutputTypeDef,
|
||||
ObjectTypeDef,
|
||||
)
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
@@ -346,6 +350,27 @@ def list_prefix(
|
||||
return response
|
||||
|
||||
|
||||
def enable_remote_storage_versioning(
|
||||
remote: RemoteStorage,
|
||||
) -> EmptyResponseMetadataTypeDef:
|
||||
"""
|
||||
Enable S3 versioning for the remote storage
|
||||
"""
|
||||
# local_fs has no
|
||||
assert isinstance(remote, S3Storage), "localfs is currently not supported"
|
||||
assert remote.client is not None
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
response = remote.client.put_bucket_versioning(
|
||||
Bucket=remote.bucket_name,
|
||||
VersioningConfiguration={
|
||||
"MFADelete": "Disabled",
|
||||
"Status": "Enabled",
|
||||
},
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
def wait_tenant_status_404(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
|
||||
121
test_runner/regress/test_s3_restore.py
Normal file
121
test_runner/regress/test_s3_restore.py
Normal file
@@ -0,0 +1,121 @@
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.utils import (
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
assert_prefix_empty,
|
||||
enable_remote_storage_versioning,
|
||||
poll_for_remote_storage_iterations,
|
||||
tenant_delete_wait_completed,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
|
||||
|
||||
def test_tenant_s3_restore(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
# Mock S3 doesn't have versioning enabled by default, enable it
|
||||
# (also do it before there is any writes to the bucket)
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
remote_storage = neon_env_builder.pageserver_remote_storage
|
||||
assert remote_storage, "remote storage not configured"
|
||||
enable_remote_storage_versioning(remote_storage)
|
||||
pytest.skip("moto doesn't support self-copy: https://github.com/getmoto/moto/issues/7300")
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file",
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# Default tenant and the one we created
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
|
||||
|
||||
# create two timelines one being the parent of another, both with non-trivial data
|
||||
parent = None
|
||||
last_flush_lsns = []
|
||||
|
||||
for timeline in ["first", "second"]:
|
||||
timeline_id = env.neon_cli.create_branch(
|
||||
timeline, tenant_id=tenant_id, ancestor_branch_name=parent
|
||||
)
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
endpoint.safe_psql(f"CREATE TABLE created_{timeline}(id integer);")
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
last_flush_lsns.append(last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
parent = timeline
|
||||
|
||||
# These sleeps are important because they fend off differences in clocks between us and S3
|
||||
time.sleep(4)
|
||||
ts_before_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
assert (
|
||||
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
|
||||
), "tenant removed before we deletion was issued"
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert (
|
||||
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
|
||||
), "tenant removed before we deletion was issued"
|
||||
env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
tenant_path = env.pageserver.tenant_dir(tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
time.sleep(4)
|
||||
ts_after_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
ps_http.tenant_time_travel_remote_storage(
|
||||
tenant_id, timestamp=ts_before_deletion, done_if_after=ts_after_deletion
|
||||
)
|
||||
|
||||
generation = env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
|
||||
ps_http.tenant_attach(tenant_id, generation=generation)
|
||||
env.pageserver.quiesce_tenants()
|
||||
|
||||
for i, timeline in enumerate(["first", "second"]):
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql(f"SELECT * FROM created_{timeline};")
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
expected_last_flush_lsn = last_flush_lsns[i]
|
||||
# There might be some activity that advances the lsn so we can't use a strict equality check
|
||||
assert last_flush_lsn >= expected_last_flush_lsn, "last_flush_lsn too old"
|
||||
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
|
||||
Reference in New Issue
Block a user