diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index abab32470b..57c57a2b70 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -28,6 +28,7 @@ use tokio_util::sync::CancellationToken; use tracing::debug; use crate::s3_bucket::RequestKind; +use crate::TimeTravelError; use crate::{ AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, @@ -379,12 +380,10 @@ impl RemoteStorage for AzureBlobStorage { _timestamp: SystemTime, _done_if_after: SystemTime, _cancel: CancellationToken, - ) -> anyhow::Result<()> { + ) -> Result<(), TimeTravelError> { // TODO use Azure point in time recovery feature for this // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview - Err(anyhow::anyhow!( - "time travel recovery for azure blob storage is not implemented" - )) + Err(TimeTravelError::Unimplemented) } } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 38a8784fe2..4aeaee70b1 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -219,7 +219,7 @@ pub trait RemoteStorage: Send + Sync + 'static { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()>; + ) -> Result<(), TimeTravelError>; } pub type DownloadStream = Pin> + Unpin + Send + Sync>>; @@ -269,6 +269,45 @@ impl std::fmt::Display for DownloadError { impl std::error::Error for DownloadError {} +#[derive(Debug)] +pub enum TimeTravelError { + /// Validation or other error happened due to user input. + BadInput(anyhow::Error), + /// The used remote storage does not have time travel recovery implemented + Unimplemented, + /// The number of versions/deletion markers is above our limit. + TooManyVersions, + /// A cancellation token aborted the process, typically during + /// request closure or process shutdown. + Cancelled, + /// Other errors + Other(anyhow::Error), +} + +impl std::fmt::Display for TimeTravelError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TimeTravelError::BadInput(e) => { + write!( + f, + "Failed to time travel recover a prefix due to user input: {e}" + ) + } + TimeTravelError::Unimplemented => write!( + f, + "time travel recovery is not implemented for the current storage backend" + ), + TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"), + TimeTravelError::TooManyVersions => { + write!(f, "Number of versions/delete markers above limit") + } + TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"), + } + } +} + +impl std::error::Error for TimeTravelError {} + /// Every storage, currently supported. /// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. #[derive(Clone)] @@ -404,7 +443,7 @@ impl GenericRemoteStorage> { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()> { + ) -> Result<(), TimeTravelError> { match self { Self::LocalFs(s) => { s.time_travel_recover(prefix, timestamp, done_if_after, cancel) diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 34a6658a69..d47fa75b37 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -18,7 +18,9 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; -use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath}; +use crate::{ + Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError, +}; use super::{RemoteStorage, StorageMetadata}; @@ -430,8 +432,8 @@ impl RemoteStorage for LocalFs { _timestamp: SystemTime, _done_if_after: SystemTime, _cancel: CancellationToken, - ) -> anyhow::Result<()> { - unimplemented!() + ) -> Result<(), TimeTravelError> { + Err(TimeTravelError::Unimplemented) } } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e615a1ce7e..4d6564cba6 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -46,7 +46,7 @@ use utils::backoff; use super::StorageMetadata; use crate::{ ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, - S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; @@ -639,14 +639,14 @@ impl RemoteStorage for S3Bucket { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()> { + ) -> Result<(), TimeTravelError> { let kind = RequestKind::TimeTravel; let _guard = self.permit(kind).await; let timestamp = DateTime::from(timestamp); let done_if_after = DateTime::from(done_if_after); - tracing::info!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); // get the passed prefix or if it is not set use prefix_in_bucket value let prefix = prefix @@ -664,21 +664,21 @@ impl RemoteStorage for S3Bucket { loop { let response = backoff::retry( || async { - Ok(self - .client + self.client .list_object_versions() .bucket(self.bucket_name.clone()) .set_prefix(prefix.clone()) .set_key_marker(key_marker.clone()) .set_version_id_marker(version_id_marker.clone()) .send() - .await?) + .await + .map_err(|e| TimeTravelError::Other(e.into())) }, is_permanent, warn_threshold, max_retries, "listing object versions for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), ) .await?; @@ -699,7 +699,8 @@ impl RemoteStorage for S3Bucket { .map(VerOrDelete::from_delete_marker); itertools::process_results(versions.chain(deletes), |n_vds| { versions_and_deletes.extend(n_vds) - })?; + }) + .map_err(TimeTravelError::Other)?; fn none_if_empty(v: Option) -> Option { v.filter(|v| !v.is_empty()) } @@ -708,9 +709,9 @@ impl RemoteStorage for S3Bucket { if version_id_marker.is_none() { // The final response is not supposed to be truncated if response.is_truncated.unwrap_or_default() { - anyhow::bail!( + return Err(TimeTravelError::Other(anyhow::anyhow!( "Received truncated ListObjectVersions response for prefix={prefix:?}" - ); + ))); } break; } @@ -721,12 +722,15 @@ impl RemoteStorage for S3Bucket { // 40 seconds, and roughly corresponds to tenants of 2 TiB physical size. const COMPLEXITY_LIMIT: usize = 100_000; if versions_and_deletes.len() >= COMPLEXITY_LIMIT { - anyhow::bail!( - "Limit for number of versions/deletions exceeded for prefix={prefix:?}" - ); + return Err(TimeTravelError::TooManyVersions); } } + tracing::info!( + "Built list for time travel with {} versions and deletions", + versions_and_deletes.len() + ); + // Work on the list of references instead of the objects directly, // otherwise we get lifetime errors in the sort_by_key call below. let mut versions_and_deletes = versions_and_deletes.iter().collect::>(); @@ -740,8 +744,8 @@ impl RemoteStorage for S3Bucket { version_id, key, .. } = &vd; if version_id == "null" { - anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \ - indicating either disabled versioning, or legacy objects with null version id values"); + return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values"))); } tracing::trace!( "Parsing version key={key} version_id={version_id} kind={:?}", @@ -788,22 +792,23 @@ impl RemoteStorage for S3Bucket { backoff::retry( || async { - Ok(self - .client + self.client .copy_object() .bucket(self.bucket_name.clone()) .key(key) .copy_source(&source_id) .send() - .await?) + .await + .map_err(|e| TimeTravelError::Other(e.into())) }, is_permanent, warn_threshold, max_retries, - "listing object versions for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + "copying object version for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), ) .await?; + tracing::info!(%version_id, %key, "Copied old version in S3"); } VerOrDelete { kind: VerOrDeleteKind::DeleteMarker, @@ -820,8 +825,13 @@ impl RemoteStorage for S3Bucket { } else { tracing::trace!("Deleting {key}..."); - let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?; - self.delete_oids(kind, &[oid]).await?; + let oid = ObjectIdentifier::builder() + .key(key.to_owned()) + .build() + .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; + self.delete_oids(kind, &[oid]) + .await + .map_err(TimeTravelError::Other)?; } } } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index fc4c4b315b..ee9792232a 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, - StorageMetadata, + StorageMetadata, TimeTravelError, }; pub struct UnreliableWrapper { @@ -191,8 +191,9 @@ impl RemoteStorage for UnreliableWrapper { timestamp: SystemTime, done_if_after: SystemTime, cancel: CancellationToken, - ) -> anyhow::Result<()> { - self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?; + ) -> Result<(), TimeTravelError> { + self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned()))) + .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; self.inner .time_travel_recover(prefix, timestamp, done_if_after, cancel) .await diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 3694385cab..a6fe7c67e1 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -178,6 +178,64 @@ paths: schema: $ref: "#/components/schemas/ServiceUnavailableError" + /v1/tenant/{tenant_id}/time_travel_remote_storage: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + - name: travel_to + in: query + required: true + schema: + type: string + format: date-time + - name: done_if_after + in: query + required: true + schema: + type: string + format: date-time + put: + description: Time travel the tenant's remote storage + responses: + "200": + description: OK + content: + application/json: + schema: + type: string + "400": + description: Error when no tenant id found in path or invalid timestamp + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "503": + description: Temporarily unavailable, please retry. + content: + application/json: + schema: + $ref: "#/components/schemas/ServiceUnavailableError" /v1/tenant/{tenant_id}/timeline: parameters: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9d062c50f2..57ee746726 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -26,6 +26,7 @@ use pageserver_api::models::{ }; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; +use remote_storage::TimeTravelError; use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -47,6 +48,7 @@ use crate::tenant::mgr::{ TenantSlotError, TenantSlotUpsertError, TenantStateError, }; use crate::tenant::mgr::{TenantSlot, UpsertLocationError}; +use crate::tenant::remote_timeline_client; use crate::tenant::secondary::SecondaryController; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; @@ -77,8 +79,14 @@ use utils::{ // For APIs that require an Active tenant, how long should we block waiting for that state? // This is not functionally necessary (clients will retry), but avoids generating a lot of // failed API calls while tenants are activating. +#[cfg(not(feature = "testing"))] const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000); +// Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to +// finish attaching, if calls to remote storage are slow. +#[cfg(feature = "testing")] +const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); + pub struct State { conf: &'static PageServerConf, tenant_manager: Arc, @@ -1424,6 +1432,79 @@ async fn list_location_config_handler( json_response(StatusCode::OK, result) } +// Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached +// (from all pageservers) as it invalidates consistency assumptions. +async fn tenant_time_travel_remote_storage_handler( + request: Request, + cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let timestamp_raw = must_get_query_param(&request, "travel_to")?; + let timestamp = humantime::parse_rfc3339(×tamp_raw) + .with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}")) + .map_err(ApiError::BadRequest)?; + + let done_if_after_raw = must_get_query_param(&request, "done_if_after")?; + let done_if_after = humantime::parse_rfc3339(&done_if_after_raw) + .with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}")) + .map_err(ApiError::BadRequest)?; + + // This is just a sanity check to fend off naive wrong usages of the API: + // the tenant needs to be detached *everywhere* + let state = get_state(&request); + let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id); + if we_manage_tenant { + return Err(ApiError::BadRequest(anyhow!( + "Tenant {tenant_shard_id} is already attached at this pageserver" + ))); + } + + let Some(storage) = state.remote_storage.as_ref() else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "remote storage not configured, cannot run time travel" + ))); + }; + + if timestamp > done_if_after { + return Err(ApiError::BadRequest(anyhow!( + "The done_if_after timestamp comes before the timestamp to recover to" + ))); + } + + tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}"); + + remote_timeline_client::upload::time_travel_recover_tenant( + storage, + &tenant_shard_id, + timestamp, + done_if_after, + &cancel, + ) + .await + .map_err(|e| match e { + TimeTravelError::BadInput(e) => { + warn!("bad input error: {e}"); + ApiError::BadRequest(anyhow!("bad input error")) + } + TimeTravelError::Unimplemented => { + ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage")) + } + TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")), + TimeTravelError::TooManyVersions => { + ApiError::InternalServerError(anyhow!("too many versions in remote storage")) + } + TimeTravelError::Other(e) => { + warn!("internal error: {e}"); + ApiError::InternalServerError(anyhow!("internal error")) + } + })?; + + json_response(StatusCode::OK, ()) +} + /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`]. async fn handle_tenant_break( r: Request, @@ -1969,6 +2050,10 @@ pub fn make_router( .get("/v1/location_config", |r| { api_handler(r, list_location_config_handler) }) + .put( + "/v1/tenant/:tenant_shard_id/time_travel_remote_storage", + |r| api_handler(r, tenant_time_travel_remote_storage_handler), + ) .get("/v1/tenant/:tenant_shard_id/timeline", |r| { api_handler(r, timeline_list_handler) }) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 949db3c543..64fd709386 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -898,6 +898,17 @@ impl TenantManager { } } + /// Whether the `TenantManager` is responsible for the tenant shard + pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool { + let locked = self.tenants.read().unwrap(); + + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) + .ok() + .flatten(); + + peek_slot.is_some() + } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 80ff5c9a2d..2e429ee9bc 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1719,6 +1719,11 @@ pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath { RemotePath::from_string(&path).expect("Failed to construct path") } +fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath { + let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}"); + RemotePath::from_string(&path).expect("Failed to construct path") +} + pub fn remote_timeline_path( tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 58d95f75c2..76df9ba5c4 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -5,9 +5,11 @@ use camino::Utf8Path; use fail::fail_point; use pageserver_api::shard::TenantShardId; use std::io::{ErrorKind, SeekFrom}; +use std::time::SystemTime; use tokio::fs::{self, File}; use tokio::io::AsyncSeekExt; use tokio_util::sync::CancellationToken; +use utils::backoff; use super::Generation; use crate::{ @@ -17,7 +19,7 @@ use crate::{ remote_initdb_preserved_archive_path, remote_path, upload_cancellable, }, }; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, TimeTravelError}; use utils::id::{TenantId, TimelineId}; use super::index::LayerFileMetadata; @@ -157,3 +159,45 @@ pub(crate) async fn preserve_initdb_archive( .await .with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'")) } + +pub(crate) async fn time_travel_recover_tenant( + storage: &GenericRemoteStorage, + tenant_shard_id: &TenantShardId, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: &CancellationToken, +) -> Result<(), TimeTravelError> { + let warn_after = 3; + let max_attempts = 10; + let mut prefixes = Vec::with_capacity(2); + if tenant_shard_id.is_zero() { + // Also recover the unsharded prefix for a shard of zero: + // - if the tenant is totally unsharded, the unsharded prefix contains all the data + // - if the tenant is sharded, we still want to recover the initdb data, but we only + // want to do it once, so let's do it on the 0 shard + let timelines_path_unsharded = + super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id); + prefixes.push(timelines_path_unsharded); + } + if !tenant_shard_id.is_unsharded() { + // If the tenant is sharded, we need to recover the sharded prefix + let timelines_path = super::remote_timelines_path(tenant_shard_id); + prefixes.push(timelines_path); + } + for prefix in &prefixes { + backoff::retry( + || async { + storage + .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel.clone()) + .await + }, + |e| !matches!(e, TimeTravelError::Other(_)), + warn_after, + max_attempts, + "time travel recovery of tenant prefix", + backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), + ) + .await?; + } + Ok(()) +} diff --git a/proxy/src/console/messages.rs b/proxy/src/console/messages.rs index 6ef9bcf4eb..4e5920436f 100644 --- a/proxy/src/console/messages.rs +++ b/proxy/src/console/messages.rs @@ -100,31 +100,6 @@ pub struct MetricsAuxInfo { pub branch_id: BranchId, } -impl MetricsAuxInfo { - /// Definitions of labels for traffic metric. - pub const TRAFFIC_LABELS: &'static [&'static str] = &[ - // Received (rx) / sent (tx). - "direction", - // ID of a project. - "project_id", - // ID of an endpoint within a project. - "endpoint_id", - // ID of a branch within a project (snapshot). - "branch_id", - ]; - - /// Values of labels for traffic metric. - // TODO: add more type safety (validate arity & positions). - pub fn traffic_labels(&self, direction: &'static str) -> [&str; 4] { - [ - direction, - &self.project_id, - &self.endpoint_id, - &self.branch_id, - ] - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index c7d566f645..fa663d8ff6 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -208,15 +208,6 @@ pub static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { .unwrap() }); -pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_io_bytes_per_client", - "Number of bytes sent/received between client and backend.", - crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS, - ) - .unwrap() -}); - pub static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_io_bytes", diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index d6f097d72d..53e0c3c8f3 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,7 +1,7 @@ use crate::{ console::messages::MetricsAuxInfo, context::RequestMonitoring, - metrics::{NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER}, + metrics::NUM_BYTES_PROXIED_COUNTER, usage_metrics::{Ids, USAGE_METRICS}, }; use tokio::io::{AsyncRead, AsyncWrite}; @@ -25,27 +25,23 @@ pub async fn proxy_pass( }); let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["tx"]); - let m_sent2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("tx")); let mut client = MeasuredStream::new( client, |_| {}, |cnt| { // Number of bytes we sent to the client (outbound). m_sent.inc_by(cnt as u64); - m_sent2.inc_by(cnt as u64); usage.record_egress(cnt as u64); }, ); let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["rx"]); - let m_recv2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("rx")); let mut compute = MeasuredStream::new( compute, |_| {}, |cnt| { // Number of bytes the client sent to the compute node (inbound). m_recv.inc_by(cnt as u64); - m_recv2.inc_by(cnt as u64); }, ); diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 65675aebe1..1a8765d830 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -4,6 +4,7 @@ import json import time from collections import defaultdict from dataclasses import dataclass +from datetime import datetime from typing import Any, Dict, List, Optional, Set, Tuple, Union import requests @@ -389,6 +390,20 @@ class PageserverHttpClient(requests.Session): ) return res.text + def tenant_time_travel_remote_storage( + self, + tenant_id: Union[TenantId, TenantShardId], + timestamp: datetime, + done_if_after: datetime, + ): + """ + Issues a request to perform time travel operations on the remote storage + """ + res = self.put( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z" + ) + self.verbose_error(res) + def timeline_list( self, tenant_id: Union[TenantId, TenantShardId], diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 6b2651e447..4cfdee6e01 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,7 +1,11 @@ import time from typing import Any, Dict, List, Optional, Union -from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef +from mypy_boto3_s3.type_defs import ( + EmptyResponseMetadataTypeDef, + ListObjectsV2OutputTypeDef, + ObjectTypeDef, +) from fixtures.log_helper import log from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient @@ -346,6 +350,27 @@ def list_prefix( return response +def enable_remote_storage_versioning( + remote: RemoteStorage, +) -> EmptyResponseMetadataTypeDef: + """ + Enable S3 versioning for the remote storage + """ + # local_fs has no + assert isinstance(remote, S3Storage), "localfs is currently not supported" + assert remote.client is not None + + # Note that this doesnt use pagination, so list is not guaranteed to be exhaustive. + response = remote.client.put_bucket_versioning( + Bucket=remote.bucket_name, + VersioningConfiguration={ + "MFADelete": "Disabled", + "Status": "Enabled", + }, + ) + return response + + def wait_tenant_status_404( pageserver_http: PageserverHttpClient, tenant_id: TenantId, diff --git a/test_runner/regress/test_s3_restore.py b/test_runner/regress/test_s3_restore.py new file mode 100644 index 0000000000..188d8a3b33 --- /dev/null +++ b/test_runner/regress/test_s3_restore.py @@ -0,0 +1,121 @@ +import time +from datetime import datetime, timezone + +import pytest +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PgBin, +) +from fixtures.pageserver.utils import ( + MANY_SMALL_LAYERS_TENANT_CONFIG, + assert_prefix_empty, + enable_remote_storage_versioning, + poll_for_remote_storage_iterations, + tenant_delete_wait_completed, + wait_for_upload, +) +from fixtures.remote_storage import RemoteStorageKind, s3_storage +from fixtures.types import Lsn +from fixtures.utils import run_pg_bench_small + + +def test_tenant_s3_restore( + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, +): + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + # Mock S3 doesn't have versioning enabled by default, enable it + # (also do it before there is any writes to the bucket) + if remote_storage_kind == RemoteStorageKind.MOCK_S3: + remote_storage = neon_env_builder.pageserver_remote_storage + assert remote_storage, "remote storage not configured" + enable_remote_storage_versioning(remote_storage) + pytest.skip("moto doesn't support self-copy: https://github.com/getmoto/moto/issues/7300") + + env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG) + env.pageserver.allowed_errors.extend( + [ + # The deletion queue will complain when it encounters simulated S3 errors + ".*deletion executor: DeleteObjects request failed.*", + # lucky race with stopping from flushing a layer we fail to schedule any uploads + ".*layer flush task.+: could not flush frozen layer: update_metadata_file", + ] + ) + + ps_http = env.pageserver.http_client() + + tenant_id = env.initial_tenant + + # Default tenant and the one we created + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + + # create two timelines one being the parent of another, both with non-trivial data + parent = None + last_flush_lsns = [] + + for timeline in ["first", "second"]: + timeline_id = env.neon_cli.create_branch( + timeline, tenant_id=tenant_id, ancestor_branch_name=parent + ) + with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint: + run_pg_bench_small(pg_bin, endpoint.connstr()) + endpoint.safe_psql(f"CREATE TABLE created_{timeline}(id integer);") + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + last_flush_lsns.append(last_flush_lsn) + ps_http.timeline_checkpoint(tenant_id, timeline_id) + wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn) + parent = timeline + + # These sleeps are important because they fend off differences in clocks between us and S3 + time.sleep(4) + ts_before_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None) + time.sleep(4) + + assert ( + ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1 + ), "tenant removed before we deletion was issued" + iterations = poll_for_remote_storage_iterations(remote_storage_kind) + tenant_delete_wait_completed(ps_http, tenant_id, iterations) + ps_http.deletion_queue_flush(execute=True) + assert ( + ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0 + ), "tenant removed before we deletion was issued" + env.attachment_service.attach_hook_drop(tenant_id) + + tenant_path = env.pageserver.tenant_dir(tenant_id) + assert not tenant_path.exists() + + assert_prefix_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + ) + + time.sleep(4) + ts_after_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None) + time.sleep(4) + + ps_http.tenant_time_travel_remote_storage( + tenant_id, timestamp=ts_before_deletion, done_if_after=ts_after_deletion + ) + + generation = env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id) + + ps_http.tenant_attach(tenant_id, generation=generation) + env.pageserver.quiesce_tenants() + + for i, timeline in enumerate(["first", "second"]): + with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint: + endpoint.safe_psql(f"SELECT * FROM created_{timeline};") + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) + expected_last_flush_lsn = last_flush_lsns[i] + # There might be some activity that advances the lsn so we can't use a strict equality check + assert last_flush_lsn >= expected_last_flush_lsn, "last_flush_lsn too old" + + assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1