From 0856fe6676e7cf8d928c0da5a6036e58b360b00b Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 2 Feb 2024 12:28:48 +0000 Subject: [PATCH 1/3] proxy: remove per client bytes (#5466) ## Problem Follow up to #5461 In my memory usage/fragmentation measurements, these metrics came up as a large source of small allocations. The replacement metric has been in use for a long time now so I think it's good to finally remove this. Per-endpoint data is still tracked elsewhere ## Summary of changes remove the per-client bytes metrics --- proxy/src/console/messages.rs | 25 ------------------------- proxy/src/metrics.rs | 9 --------- proxy/src/proxy/passthrough.rs | 6 +----- 3 files changed, 1 insertion(+), 39 deletions(-) diff --git a/proxy/src/console/messages.rs b/proxy/src/console/messages.rs index 6ef9bcf4eb..4e5920436f 100644 --- a/proxy/src/console/messages.rs +++ b/proxy/src/console/messages.rs @@ -100,31 +100,6 @@ pub struct MetricsAuxInfo { pub branch_id: BranchId, } -impl MetricsAuxInfo { - /// Definitions of labels for traffic metric. - pub const TRAFFIC_LABELS: &'static [&'static str] = &[ - // Received (rx) / sent (tx). - "direction", - // ID of a project. - "project_id", - // ID of an endpoint within a project. - "endpoint_id", - // ID of a branch within a project (snapshot). - "branch_id", - ]; - - /// Values of labels for traffic metric. - // TODO: add more type safety (validate arity & positions). - pub fn traffic_labels(&self, direction: &'static str) -> [&str; 4] { - [ - direction, - &self.project_id, - &self.endpoint_id, - &self.branch_id, - ] - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index c7d566f645..fa663d8ff6 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -208,15 +208,6 @@ pub static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { .unwrap() }); -pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_io_bytes_per_client", - "Number of bytes sent/received between client and backend.", - crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS, - ) - .unwrap() -}); - pub static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_io_bytes", diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index d6f097d72d..53e0c3c8f3 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,7 +1,7 @@ use crate::{ console::messages::MetricsAuxInfo, context::RequestMonitoring, - metrics::{NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER}, + metrics::NUM_BYTES_PROXIED_COUNTER, usage_metrics::{Ids, USAGE_METRICS}, }; use tokio::io::{AsyncRead, AsyncWrite}; @@ -25,27 +25,23 @@ pub async fn proxy_pass( }); let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["tx"]); - let m_sent2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("tx")); let mut client = MeasuredStream::new( client, |_| {}, |cnt| { // Number of bytes we sent to the client (outbound). m_sent.inc_by(cnt as u64); - m_sent2.inc_by(cnt as u64); usage.record_egress(cnt as u64); }, ); let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["rx"]); - let m_recv2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("rx")); let mut compute = MeasuredStream::new( compute, |_| {}, |cnt| { // Number of bytes the client sent to the compute node (inbound). m_recv.inc_by(cnt as u64); - m_recv2.inc_by(cnt as u64); }, ); From 48b05b7c503e3871d34f413211695fc5a2250a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 2 Feb 2024 14:52:12 +0100 Subject: [PATCH 2/3] Add a time_travel_remote_storage http endpoint (#6533) Adds an endpoint to the pageserver to S3-recover an entire tenant to a specific given timestamp. Required input parameters: * `travel_to`: the target timestamp to recover the S3 state to * `done_if_after`: a timestamp that marks the beginning of the recovery process. retries of the query should keep this value constant. it *must* be after `travel_to`, and also after any changes we want to revert, and must represent a point in time before the endpoint is being called, all of these time points in terms of the time source used by S3. these criteria need to hold even in the face of clock differences, so I recommend waiting a specific amount of time, then taking `done_if_after`, then waiting some amount of time again, and only then issuing the request. Also important to note: the timestamps in S3 work at second accuracy, so one needs to add generous waits before and after for the process to work smoothly (at least 2-3 seconds). We ignore the added test for the mocked S3 for now due to a limitation in moto: https://github.com/getmoto/moto/issues/7300 . Part of https://github.com/neondatabase/cloud/issues/8233 --- libs/remote_storage/src/azure_blob.rs | 7 +- libs/remote_storage/src/lib.rs | 43 ++++++- libs/remote_storage/src/local_fs.rs | 8 +- libs/remote_storage/src/s3_bucket.rs | 54 ++++---- libs/remote_storage/src/simulate_failures.rs | 7 +- pageserver/src/http/openapi_spec.yml | 58 +++++++++ pageserver/src/http/routes.rs | 79 ++++++++++++ pageserver/src/tenant/mgr.rs | 11 ++ .../src/tenant/remote_timeline_client.rs | 5 + .../tenant/remote_timeline_client/upload.rs | 46 ++++++- test_runner/fixtures/pageserver/http.py | 15 +++ test_runner/fixtures/pageserver/utils.py | 27 +++- test_runner/regress/test_s3_restore.py | 121 ++++++++++++++++++ 13 files changed, 445 insertions(+), 36 deletions(-) create mode 100644 test_runner/regress/test_s3_restore.py diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index abab32470b..57c57a2b70 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -28,6 +28,7 @@ use tokio_util::sync::CancellationToken; use tracing::debug; use crate::s3_bucket::RequestKind; +use crate::TimeTravelError; use crate::{ AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, @@ -379,12 +380,10 @@ impl RemoteStorage for AzureBlobStorage { _timestamp: SystemTime, _done_if_after: SystemTime, _cancel: CancellationToken, - ) -> anyhow::Result<()> { + ) -> Result<(), TimeTravelError> { // TODO use Azure point in time recovery feature for this // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview - Err(anyhow::anyhow!( - "time travel recovery for azure blob storage is not implemented" - )) + Err(TimeTravelError::Unimplemented) } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 38a8784fe2..4aeaee70b1 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -219,7 +219,7 @@ pub trait RemoteStorage: Send + Sync + 'static { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()>; + ) -> Result<(), TimeTravelError>; } pub type DownloadStream = Pin> + Unpin + Send + Sync>>; @@ -269,6 +269,45 @@ impl std::fmt::Display for DownloadError { impl std::error::Error for DownloadError {} +#[derive(Debug)] +pub enum TimeTravelError { + /// Validation or other error happened due to user input. + BadInput(anyhow::Error), + /// The used remote storage does not have time travel recovery implemented + Unimplemented, + /// The number of versions/deletion markers is above our limit. + TooManyVersions, + /// A cancellation token aborted the process, typically during + /// request closure or process shutdown. + Cancelled, + /// Other errors + Other(anyhow::Error), +} + +impl std::fmt::Display for TimeTravelError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TimeTravelError::BadInput(e) => { + write!( + f, + "Failed to time travel recover a prefix due to user input: {e}" + ) + } + TimeTravelError::Unimplemented => write!( + f, + "time travel recovery is not implemented for the current storage backend" + ), + TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"), + TimeTravelError::TooManyVersions => { + write!(f, "Number of versions/delete markers above limit") + } + TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"), + } + } +} + +impl std::error::Error for TimeTravelError {} + /// Every storage, currently supported. /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. #[derive(Clone)] @@ -404,7 +443,7 @@ impl GenericRemoteStorage> { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()> { + ) -> Result<(), TimeTravelError> { match self { Self::LocalFs(s) => { s.time_travel_recover(prefix, timestamp, done_if_after, cancel) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 34a6658a69..d47fa75b37 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -18,7 +18,9 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; -use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath}; +use crate::{ + Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError, +}; use super::{RemoteStorage, StorageMetadata}; @@ -430,8 +432,8 @@ impl RemoteStorage for LocalFs { _timestamp: SystemTime, _done_if_after: SystemTime, _cancel: CancellationToken, - ) -> anyhow::Result<()> { - unimplemented!() + ) -> Result<(), TimeTravelError> { + Err(TimeTravelError::Unimplemented) } } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e615a1ce7e..4d6564cba6 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -46,7 +46,7 @@ use utils::backoff; use super::StorageMetadata; use crate::{ ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, - S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; @@ -639,14 +639,14 @@ impl RemoteStorage for S3Bucket { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()> { + ) -> Result<(), TimeTravelError> { let kind = RequestKind::TimeTravel; let _guard = self.permit(kind).await; let timestamp = DateTime::from(timestamp); let done_if_after = DateTime::from(done_if_after); - tracing::info!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); // get the passed prefix or if it is not set use prefix_in_bucket value let prefix = prefix @@ -664,21 +664,21 @@ impl RemoteStorage for S3Bucket { loop { let response = backoff::retry( || async { - Ok(self - .client + self.client .list_object_versions() .bucket(self.bucket_name.clone()) .set_prefix(prefix.clone()) .set_key_marker(key_marker.clone()) .set_version_id_marker(version_id_marker.clone()) .send() - .await?) + .await + .map_err(|e| TimeTravelError::Other(e.into())) }, is_permanent, warn_threshold, max_retries, "listing object versions for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), ) .await?; @@ -699,7 +699,8 @@ impl RemoteStorage for S3Bucket { .map(VerOrDelete::from_delete_marker); itertools::process_results(versions.chain(deletes), |n_vds| { versions_and_deletes.extend(n_vds) - })?; + }) + .map_err(TimeTravelError::Other)?; fn none_if_empty(v: Option) -> Option { v.filter(|v| !v.is_empty()) } @@ -708,9 +709,9 @@ impl RemoteStorage for S3Bucket { if version_id_marker.is_none() { // The final response is not supposed to be truncated if response.is_truncated.unwrap_or_default() { - anyhow::bail!( + return Err(TimeTravelError::Other(anyhow::anyhow!( "Received truncated ListObjectVersions response for prefix={prefix:?}" - ); + ))); } break; } @@ -721,12 +722,15 @@ impl RemoteStorage for S3Bucket { // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size. const COMPLEXITY_LIMIT: usize = 100_000; if versions_and_deletes.len() >= COMPLEXITY_LIMIT { - anyhow::bail!( - "Limit for number of versions/deletions exceeded for prefix={prefix:?}" - ); + return Err(TimeTravelError::TooManyVersions); } } + tracing::info!( + "Built list for time travel with {} versions and deletions", + versions_and_deletes.len() + ); + // Work on the list of references instead of the objects directly, // otherwise we get lifetime errors in the sort_by_key call below. let mut versions_and_deletes = versions_and_deletes.iter().collect::>(); @@ -740,8 +744,8 @@ impl RemoteStorage for S3Bucket { version_id, key, .. } = &vd; if version_id == "null" { - anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \ - indicating either disabled versioning, or legacy objects with null version id values"); + return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values"))); } tracing::trace!( "Parsing version key={key} version_id={version_id} kind={:?}", @@ -788,22 +792,23 @@ impl RemoteStorage for S3Bucket { backoff::retry( || async { - Ok(self - .client + self.client .copy_object() .bucket(self.bucket_name.clone()) .key(key) .copy_source(&source_id) .send() - .await?) + .await + .map_err(|e| TimeTravelError::Other(e.into())) }, is_permanent, warn_threshold, max_retries, - "listing object versions for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + "copying object version for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), ) .await?; + tracing::info!(%version_id, %key, "Copied old version in S3"); } VerOrDelete { kind: VerOrDeleteKind::DeleteMarker, @@ -820,8 +825,13 @@ impl RemoteStorage for S3Bucket { } else { tracing::trace!("Deleting {key}..."); - let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?; - self.delete_oids(kind, &[oid]).await?; + let oid = ObjectIdentifier::builder() + .key(key.to_owned()) + .build() + .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; + self.delete_oids(kind, &[oid]) + .await + .map_err(TimeTravelError::Other)?; } } } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index fc4c4b315b..ee9792232a 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, - StorageMetadata, + StorageMetadata, TimeTravelError, }; pub struct UnreliableWrapper { @@ -191,8 +191,9 @@ impl RemoteStorage for UnreliableWrapper { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()> { - self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?; + ) -> Result<(), TimeTravelError> { + self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned()))) + .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; self.inner .time_travel_recover(prefix, timestamp, done_if_after, cancel) .await diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 3694385cab..a6fe7c67e1 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -178,6 +178,64 @@ paths: schema: $ref: "#/components/schemas/ServiceUnavailableError" + /v1/tenant/{tenant_id}/time_travel_remote_storage: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + - name: travel_to + in: query + required: true + schema: + type: string + format: date-time + - name: done_if_after + in: query + required: true + schema: + type: string + format: date-time + put: + description: Time travel the tenant's remote storage + responses: + "200": + description: OK + content: + application/json: + schema: + type: string + "400": + description: Error when no tenant id found in path or invalid timestamp + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "503": + description: Temporarily unavailable, please retry. + content: + application/json: + schema: + $ref: "#/components/schemas/ServiceUnavailableError" /v1/tenant/{tenant_id}/timeline: parameters: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9d062c50f2..88c36e8595 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -26,6 +26,7 @@ use pageserver_api::models::{ }; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; +use remote_storage::TimeTravelError; use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -47,6 +48,7 @@ use crate::tenant::mgr::{ TenantSlotError, TenantSlotUpsertError, TenantStateError, }; use crate::tenant::mgr::{TenantSlot, UpsertLocationError}; +use crate::tenant::remote_timeline_client; use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; @@ -1424,6 +1426,79 @@ async fn list_location_config_handler( json_response(StatusCode::OK, result) } +// Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached +// (from all pageservers) as it invalidates consistency assumptions. +async fn tenant_time_travel_remote_storage_handler( + request: Request, + cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let timestamp_raw = must_get_query_param(&request, "travel_to")?; + let timestamp = humantime::parse_rfc3339(×tamp_raw) + .with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}")) + .map_err(ApiError::BadRequest)?; + + let done_if_after_raw = must_get_query_param(&request, "done_if_after")?; + let done_if_after = humantime::parse_rfc3339(&done_if_after_raw) + .with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}")) + .map_err(ApiError::BadRequest)?; + + // This is just a sanity check to fend off naive wrong usages of the API: + // the tenant needs to be detached *everywhere* + let state = get_state(&request); + let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id); + if we_manage_tenant { + return Err(ApiError::BadRequest(anyhow!( + "Tenant {tenant_shard_id} is already attached at this pageserver" + ))); + } + + let Some(storage) = state.remote_storage.as_ref() else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "remote storage not configured, cannot run time travel" + ))); + }; + + if timestamp > done_if_after { + return Err(ApiError::BadRequest(anyhow!( + "The done_if_after timestamp comes before the timestamp to recover to" + ))); + } + + tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}"); + + remote_timeline_client::upload::time_travel_recover_tenant( + storage, + &tenant_shard_id, + timestamp, + done_if_after, + &cancel, + ) + .await + .map_err(|e| match e { + TimeTravelError::BadInput(e) => { + warn!("bad input error: {e}"); + ApiError::BadRequest(anyhow!("bad input error")) + } + TimeTravelError::Unimplemented => { + ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage")) + } + TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")), + TimeTravelError::TooManyVersions => { + ApiError::InternalServerError(anyhow!("too many versions in remote storage")) + } + TimeTravelError::Other(e) => { + warn!("internal error: {e}"); + ApiError::InternalServerError(anyhow!("internal error")) + } + })?; + + json_response(StatusCode::OK, ()) +} + /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`]. async fn handle_tenant_break( r: Request, @@ -1969,6 +2044,10 @@ pub fn make_router( .get("/v1/location_config", |r| { api_handler(r, list_location_config_handler) }) + .put( + "/v1/tenant/:tenant_shard_id/time_travel_remote_storage", + |r| api_handler(r, tenant_time_travel_remote_storage_handler), + ) .get("/v1/tenant/:tenant_shard_id/timeline", |r| { api_handler(r, timeline_list_handler) }) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 949db3c543..64fd709386 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -898,6 +898,17 @@ impl TenantManager { } } + /// Whether the `TenantManager` is responsible for the tenant shard + pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool { + let locked = self.tenants.read().unwrap(); + + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) + .ok() + .flatten(); + + peek_slot.is_some() + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 80ff5c9a2d..2e429ee9bc 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1719,6 +1719,11 @@ pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath { RemotePath::from_string(&path).expect("Failed to construct path") } +fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath { + let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}"); + RemotePath::from_string(&path).expect("Failed to construct path") +} + pub fn remote_timeline_path( tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 58d95f75c2..76df9ba5c4 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -5,9 +5,11 @@ use camino::Utf8Path; use fail::fail_point; use pageserver_api::shard::TenantShardId; use std::io::{ErrorKind, SeekFrom}; +use std::time::SystemTime; use tokio::fs::{self, File}; use tokio::io::AsyncSeekExt; use tokio_util::sync::CancellationToken; +use utils::backoff; use super::Generation; use crate::{ @@ -17,7 +19,7 @@ use crate::{ remote_initdb_preserved_archive_path, remote_path, upload_cancellable, }, }; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, TimeTravelError}; use utils::id::{TenantId, TimelineId}; use super::index::LayerFileMetadata; @@ -157,3 +159,45 @@ pub(crate) async fn preserve_initdb_archive( .await .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'")) } + +pub(crate) async fn time_travel_recover_tenant( + storage: &GenericRemoteStorage, + tenant_shard_id: &TenantShardId, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: &CancellationToken, +) -> Result<(), TimeTravelError> { + let warn_after = 3; + let max_attempts = 10; + let mut prefixes = Vec::with_capacity(2); + if tenant_shard_id.is_zero() { + // Also recover the unsharded prefix for a shard of zero: + // - if the tenant is totally unsharded, the unsharded prefix contains all the data + // - if the tenant is sharded, we still want to recover the initdb data, but we only + // want to do it once, so let's do it on the 0 shard + let timelines_path_unsharded = + super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id); + prefixes.push(timelines_path_unsharded); + } + if !tenant_shard_id.is_unsharded() { + // If the tenant is sharded, we need to recover the sharded prefix + let timelines_path = super::remote_timelines_path(tenant_shard_id); + prefixes.push(timelines_path); + } + for prefix in &prefixes { + backoff::retry( + || async { + storage + .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel.clone()) + .await + }, + |e| !matches!(e, TimeTravelError::Other(_)), + warn_after, + max_attempts, + "time travel recovery of tenant prefix", + backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), + ) + .await?; + } + Ok(()) +} diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 65675aebe1..1a8765d830 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -4,6 +4,7 @@ import json import time from collections import defaultdict from dataclasses import dataclass +from datetime import datetime from typing import Any, Dict, List, Optional, Set, Tuple, Union import requests @@ -389,6 +390,20 @@ class PageserverHttpClient(requests.Session): ) return res.text + def tenant_time_travel_remote_storage( + self, + tenant_id: Union[TenantId, TenantShardId], + timestamp: datetime, + done_if_after: datetime, + ): + """ + Issues a request to perform time travel operations on the remote storage + """ + res = self.put( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z" + ) + self.verbose_error(res) + def timeline_list( self, tenant_id: Union[TenantId, TenantShardId], diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 6b2651e447..4cfdee6e01 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,7 +1,11 @@ import time from typing import Any, Dict, List, Optional, Union -from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef +from mypy_boto3_s3.type_defs import ( + EmptyResponseMetadataTypeDef, + ListObjectsV2OutputTypeDef, + ObjectTypeDef, +) from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient @@ -346,6 +350,27 @@ def list_prefix( return response +def enable_remote_storage_versioning( + remote: RemoteStorage, +) -> EmptyResponseMetadataTypeDef: + """ + Enable S3 versioning for the remote storage + """ + # local_fs has no + assert isinstance(remote, S3Storage), "localfs is currently not supported" + assert remote.client is not None + + # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. + response = remote.client.put_bucket_versioning( + Bucket=remote.bucket_name, + VersioningConfiguration={ + "MFADelete": "Disabled", + "Status": "Enabled", + }, + ) + return response + + def wait_tenant_status_404( pageserver_http: PageserverHttpClient, tenant_id: TenantId, diff --git a/test_runner/regress/test_s3_restore.py b/test_runner/regress/test_s3_restore.py new file mode 100644 index 0000000000..188d8a3b33 --- /dev/null +++ b/test_runner/regress/test_s3_restore.py @@ -0,0 +1,121 @@ +import time +from datetime import datetime, timezone + +import pytest +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PgBin, +) +from fixtures.pageserver.utils import ( + MANY_SMALL_LAYERS_TENANT_CONFIG, + assert_prefix_empty, + enable_remote_storage_versioning, + poll_for_remote_storage_iterations, + tenant_delete_wait_completed, + wait_for_upload, +) +from fixtures.remote_storage import RemoteStorageKind, s3_storage +from fixtures.types import Lsn +from fixtures.utils import run_pg_bench_small + + +def test_tenant_s3_restore( + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, +): + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + # Mock S3 doesn't have versioning enabled by default, enable it + # (also do it before there is any writes to the bucket) + if remote_storage_kind == RemoteStorageKind.MOCK_S3: + remote_storage = neon_env_builder.pageserver_remote_storage + assert remote_storage, "remote storage not configured" + enable_remote_storage_versioning(remote_storage) + pytest.skip("moto doesn't support self-copy: https://github.com/getmoto/moto/issues/7300") + + env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG) + env.pageserver.allowed_errors.extend( + [ + # The deletion queue will complain when it encounters simulated S3 errors + ".*deletion executor: DeleteObjects request failed.*", + # lucky race with stopping from flushing a layer we fail to schedule any uploads + ".*layer flush task.+: could not flush frozen layer: update_metadata_file", + ] + ) + + ps_http = env.pageserver.http_client() + + tenant_id = env.initial_tenant + + # Default tenant and the one we created + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + + # create two timelines one being the parent of another, both with non-trivial data + parent = None + last_flush_lsns = [] + + for timeline in ["first", "second"]: + timeline_id = env.neon_cli.create_branch( + timeline, tenant_id=tenant_id, ancestor_branch_name=parent + ) + with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint: + run_pg_bench_small(pg_bin, endpoint.connstr()) + endpoint.safe_psql(f"CREATE TABLE created_{timeline}(id integer);") + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + last_flush_lsns.append(last_flush_lsn) + ps_http.timeline_checkpoint(tenant_id, timeline_id) + wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn) + parent = timeline + + # These sleeps are important because they fend off differences in clocks between us and S3 + time.sleep(4) + ts_before_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None) + time.sleep(4) + + assert ( + ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + ), "tenant removed before we deletion was issued" + iterations = poll_for_remote_storage_iterations(remote_storage_kind) + tenant_delete_wait_completed(ps_http, tenant_id, iterations) + ps_http.deletion_queue_flush(execute=True) + assert ( + ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0 + ), "tenant removed before we deletion was issued" + env.attachment_service.attach_hook_drop(tenant_id) + + tenant_path = env.pageserver.tenant_dir(tenant_id) + assert not tenant_path.exists() + + assert_prefix_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + ) + + time.sleep(4) + ts_after_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None) + time.sleep(4) + + ps_http.tenant_time_travel_remote_storage( + tenant_id, timestamp=ts_before_deletion, done_if_after=ts_after_deletion + ) + + generation = env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id) + + ps_http.tenant_attach(tenant_id, generation=generation) + env.pageserver.quiesce_tenants() + + for i, timeline in enumerate(["first", "second"]): + with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint: + endpoint.safe_psql(f"SELECT * FROM created_{timeline};") + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + expected_last_flush_lsn = last_flush_lsns[i] + # There might be some activity that advances the lsn so we can't use a strict equality check + assert last_flush_lsn >= expected_last_flush_lsn, "last_flush_lsn too old" + + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 From 56171cbe8c2b81ba2b949a5ec39c11991fb5e47a Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 2 Feb 2024 14:14:42 +0000 Subject: [PATCH 3/3] pageserver: more permissive activation timeout when testing (#6564) ## Problem The 5 second activation timeout is appropriate for production environments, where we want to give a prompt response to the cloud control plane, and if we fail it will retry the call. In tests however, we don't want every call to e.g. timeline create to have to come with a retry wrapper. This issue has always been there, but it is more apparent in sharding tests that concurrently attach several tenant shards. Closes: https://github.com/neondatabase/neon/issues/6563 ## Summary of changes When `testing` feature is enabled, make `ACTIVE_TENANT_TIMEOUT` 30 seconds instead of 5 seconds. --- pageserver/src/http/routes.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 88c36e8595..57ee746726 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -79,8 +79,14 @@ use utils::{ // For APIs that require an Active tenant, how long should we block waiting for that state? // This is not functionally necessary (clients will retry), but avoids generating a lot of // failed API calls while tenants are activating. +#[cfg(not(feature = "testing"))] const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000); +// Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to +// finish attaching, if calls to remote storage are slow. +#[cfg(feature = "testing")] +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); + pub struct State { conf: &'static PageServerConf, tenant_manager: Arc,