From 8dee9908f83fdebea1dfd36304272bdbe684ad5c Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 25 Jan 2024 18:45:17 +0200 Subject: [PATCH 1/5] fix(compaction_task): wrong log levels (#6442) Filter what we log on compaction task. Per discussion in last triage call, fixing these by introducing and inspecting the root cause within anyhow::Error instead of rolling out proper conversions. Fixes: #6365 Fixes: #6367 --- pageserver/src/tenant/tasks.rs | 60 ++++++++++++++++++++++++++- pageserver/src/tenant/timeline.rs | 16 ++++++- pageserver/src/tenant/upload_queue.rs | 35 ++++++++++++---- 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 2b2fcc7711..5f39c46a84 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; +use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -181,8 +182,11 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { ); error_run_count += 1; let wait_duration = Duration::from_secs_f64(wait_duration); - error!( - "Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}", + log_compaction_error( + &e, + error_run_count, + &wait_duration, + cancel.is_cancelled(), ); wait_duration } else { @@ -210,6 +214,58 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); } +fn log_compaction_error( + e: &CompactionError, + error_run_count: u32, + sleep_duration: &std::time::Duration, + task_cancelled: bool, +) { + use crate::tenant::upload_queue::NotInitialized; + use crate::tenant::PageReconstructError; + use CompactionError::*; + + enum LooksLike { + Info, + Error, + } + + let decision = match e { + ShuttingDown => None, + _ if task_cancelled => Some(LooksLike::Info), + Other(e) => { + let root_cause = e.root_cause(); + + let is_stopping = { + let upload_queue = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + + let timeline = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + + upload_queue || timeline + }; + + if is_stopping { + Some(LooksLike::Info) + } else { + Some(LooksLike::Error) + } + } + }; + + match decision { + Some(LooksLike::Info) => info!( + "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}", + ), + Some(LooksLike::Error) => error!( + "Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}", + ), + None => {} + } +} + /// /// GC task's main loop /// diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 666dae94e2..c21fe94d01 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -392,8 +392,7 @@ pub(crate) enum PageReconstructError { #[error("Ancestor LSN wait error: {0}")] AncestorLsnTimeout(#[from] WaitLsnError), - /// The operation was cancelled - #[error("Cancelled")] + #[error("timeline shutting down")] Cancelled, /// The ancestor of this is being stopped @@ -405,6 +404,19 @@ pub(crate) enum PageReconstructError { WalRedo(anyhow::Error), } +impl PageReconstructError { + /// Returns true if this error indicates a tenant/timeline shutdown alike situation + pub(crate) fn is_stopping(&self) -> bool { + use PageReconstructError::*; + match self { + Other(_) => false, + AncestorLsnTimeout(_) => false, + Cancelled | AncestorStopping(_) => true, + WalRedo(_) => false, + } + } +} + #[derive(thiserror::Error, Debug)] enum CreateImageLayersError { #[error("timeline shutting down")] diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 32f14f40c5..0b61bc0a10 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -126,6 +126,27 @@ pub(super) struct UploadQueueStopped { pub(super) deleted_at: SetDeletedFlagProgress, } +#[derive(thiserror::Error, Debug)] +pub(crate) enum NotInitialized { + #[error("queue is in state Uninitialized")] + Uninitialized, + #[error("queue is in state Stopping")] + Stopped, + #[error("queue is shutting down")] + ShuttingDown, +} + +impl NotInitialized { + pub(crate) fn is_stopping(&self) -> bool { + use NotInitialized::*; + match self { + Uninitialized => false, + Stopped => true, + ShuttingDown => true, + } + } +} + impl UploadQueue { pub(crate) fn initialize_empty_remote( &mut self, @@ -214,17 +235,17 @@ impl UploadQueue { } pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { + use UploadQueue::*; match self { - UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { - anyhow::bail!("queue is in state {}", self.as_str()) - } - UploadQueue::Initialized(x) => { - if !x.shutting_down { - Ok(x) + Uninitialized => Err(NotInitialized::Uninitialized.into()), + Initialized(x) => { + if x.shutting_down { + Err(NotInitialized::ShuttingDown.into()) } else { - anyhow::bail!("queue is shutting down") + Ok(x) } } + Stopped(_) => Err(NotInitialized::Stopped.into()), } } From d52b81340f77f37a2cad3237cfb553d5257d634e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 25 Jan 2024 18:23:18 +0100 Subject: [PATCH 2/5] S3 based recovery (#6155) Adds a new `time_travel_recover` function to the `RemoteStorage` trait that allows time travel like functionality for S3 buckets, regardless of their content (it is not even pageserver related). It takes a different approach from [this post](https://aws.amazon.com/blogs/storage/point-in-time-restore-for-amazon-s3-buckets/) that is more complicated. It takes as input a prefix a target timestamp, and a limit timestamp: * executes [`ListObjectVersions`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) * obtains the latest version that comes before the target timestamp * copies that latest version to the same prefix * if there is versions newer than the limit timestamp, it doesn't do anything for the file The limit timestamp is meant to be some timestamp before the start of the recovery operation and after any changes that one wants to revert. For example, it might be the time point after a tenant was detached from all involved pageservers. The limiting mechanism ensures that the operation is idempotent and can be retried without causing additional writes/copies. The approach fulfills all the requirements laid out in 8233, and is a recoverable operation. Nothing is deleted permanently, only new entries added to the version log. I also enable [nextest retries](https://nexte.st/book/retries.html) to help with some general S3 flakiness (on top of low level retries). Part of https://github.com/neondatabase/cloud/issues/8233 --- .github/workflows/build_and_test.yml | 1 + libs/remote_storage/src/azure_blob.rs | 16 + libs/remote_storage/src/lib.rs | 38 +++ libs/remote_storage/src/local_fs.rs | 15 +- libs/remote_storage/src/s3_bucket.rs | 297 +++++++++++++++---- libs/remote_storage/src/s3_bucket/metrics.rs | 8 +- libs/remote_storage/src/simulate_failures.rs | 16 + libs/remote_storage/tests/test_real_s3.rs | 123 +++++++- 8 files changed, 452 insertions(+), 62 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 2b88f09b3d..fb88d4da96 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -21,6 +21,7 @@ env: COPT: '-Werror' AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} + NEXTEST_RETRIES: 3 jobs: check-permissions: diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index e3edfd08a5..abab32470b 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -8,6 +8,7 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::SystemTime; use super::REMOTE_STORAGE_PREFIX_SEPARATOR; use anyhow::Result; @@ -23,6 +24,7 @@ use futures::stream::Stream; use futures_util::StreamExt; use http_types::{StatusCode, Url}; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::debug; use crate::s3_bucket::RequestKind; @@ -370,6 +372,20 @@ impl RemoteStorage for AzureBlobStorage { copy_status = status; } } + + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: CancellationToken, + ) -> anyhow::Result<()> { + // TODO use Azure point in time recovery feature for this + // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview + Err(anyhow::anyhow!( + "time travel recovery for azure blob storage is not implemented" + )) + } } pin_project_lite::pin_project! { diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f247fcdc38..bf9c51ad1a 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use futures::stream::Stream; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; +use tokio_util::sync::CancellationToken; use toml_edit::Item; use tracing::info; @@ -210,6 +211,15 @@ pub trait RemoteStorage: Send + Sync + 'static { /// Copy a remote object inside a bucket from one path to another. async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>; + + /// Resets the content of everything with the given prefix to the given state + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()>; } pub type DownloadStream = Pin> + Unpin + Send + Sync>>; @@ -387,6 +397,33 @@ impl GenericRemoteStorage> { Self::Unreliable(s) => s.copy(from, to).await, } } + + pub async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + match self { + Self::LocalFs(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::AwsS3(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::AzureBlob(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + Self::Unreliable(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } + } + } } impl GenericRemoteStorage { @@ -674,6 +711,7 @@ impl ConcurrencyLimiter { RequestKind::List => &self.read, RequestKind::Delete => &self.write, RequestKind::Copy => &self.write, + RequestKind::TimeTravel => &self.write, } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 5f5bc1a9fa..34a6658a69 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -4,7 +4,7 @@ //! This storage used in tests, but can also be used in cases when a certain persistent //! volume is mounted to the local FS. -use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin}; +use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime}; use anyhow::{bail, ensure, Context}; use bytes::Bytes; @@ -14,7 +14,7 @@ use tokio::{ fs, io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, }; -use tokio_util::io::ReaderStream; +use tokio_util::{io::ReaderStream, sync::CancellationToken}; use tracing::*; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; @@ -422,6 +422,17 @@ impl RemoteStorage for LocalFs { })?; Ok(()) } + + #[allow(clippy::diverging_sub_expression)] + async fn time_travel_recover( + &self, + _prefix: Option<&RemotePath>, + _timestamp: SystemTime, + _done_if_after: SystemTime, + _cancel: CancellationToken, + ) -> anyhow::Result<()> { + unimplemented!() + } } fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index e72cf28228..4909b8522b 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -6,12 +6,14 @@ use std::{ borrow::Cow, + collections::HashMap, pin::Pin, sync::Arc, task::{Context, Poll}, + time::SystemTime, }; -use anyhow::Context as _; +use anyhow::{anyhow, Context as _}; use aws_config::{ environment::credentials::EnvironmentVariableCredentialsProvider, imds::credentials::ImdsCredentialsProvider, @@ -27,17 +29,19 @@ use aws_sdk_s3::{ config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep}, error::SdkError, operation::get_object::GetObjectError, - types::{Delete, ObjectIdentifier}, + types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion}, Client, }; use aws_smithy_async::rt::sleep::TokioSleep; -use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; +use aws_smithy_types::{body::SdkBody, DateTime}; use bytes::Bytes; use futures::stream::Stream; use hyper::Body; use scopeguard::ScopeGuard; +use tokio_util::sync::CancellationToken; +use utils::backoff; use super::StorageMetadata; use crate::{ @@ -270,6 +274,59 @@ impl S3Bucket { } } } + + async fn delete_oids( + &self, + kind: RequestKind, + delete_objects: &[ObjectIdentifier], + ) -> anyhow::Result<()> { + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { + let started_at = start_measuring_requests(kind); + + let resp = self + .client + .delete_objects() + .bucket(self.bucket_name.clone()) + .delete( + Delete::builder() + .set_objects(Some(chunk.to_vec())) + .build()?, + ) + .send() + .await; + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + + let resp = resp?; + metrics::BUCKET_METRICS + .deleted_objects_total + .inc_by(chunk.len() as u64); + if let Some(errors) = resp.errors { + // Log a bounded number of the errors within the response: + // these requests can carry 1000 keys so logging each one + // would be too verbose, especially as errors may lead us + // to retry repeatedly. + const LOG_UP_TO_N_ERRORS: usize = 10; + for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { + tracing::warn!( + "DeleteObjects key {} failed: {}: {}", + e.key.as_ref().map(Cow::from).unwrap_or("".into()), + e.code.as_ref().map(Cow::from).unwrap_or("".into()), + e.message.as_ref().map(Cow::from).unwrap_or("".into()) + ); + } + + return Err(anyhow::format_err!( + "Failed to delete {} objects", + errors.len() + )); + } + } + Ok(()) + } } pin_project_lite::pin_project! { @@ -568,64 +625,168 @@ impl RemoteStorage for S3Bucket { delete_objects.push(obj_id); } - for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) { - let started_at = start_measuring_requests(kind); - - let resp = self - .client - .delete_objects() - .bucket(self.bucket_name.clone()) - .delete( - Delete::builder() - .set_objects(Some(chunk.to_vec())) - .build()?, - ) - .send() - .await; - - let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS - .req_seconds - .observe_elapsed(kind, &resp, started_at); - - match resp { - Ok(resp) => { - metrics::BUCKET_METRICS - .deleted_objects_total - .inc_by(chunk.len() as u64); - if let Some(errors) = resp.errors { - // Log a bounded number of the errors within the response: - // these requests can carry 1000 keys so logging each one - // would be too verbose, especially as errors may lead us - // to retry repeatedly. - const LOG_UP_TO_N_ERRORS: usize = 10; - for e in errors.iter().take(LOG_UP_TO_N_ERRORS) { - tracing::warn!( - "DeleteObjects key {} failed: {}: {}", - e.key.as_ref().map(Cow::from).unwrap_or("".into()), - e.code.as_ref().map(Cow::from).unwrap_or("".into()), - e.message.as_ref().map(Cow::from).unwrap_or("".into()) - ); - } - - return Err(anyhow::format_err!( - "Failed to delete {} objects", - errors.len() - )); - } - } - Err(e) => { - return Err(e.into()); - } - } - } - Ok(()) + self.delete_oids(kind, &delete_objects).await } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { let paths = std::array::from_ref(path); self.delete_objects(paths).await } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::TimeTravel; + let _guard = self.permit(kind).await; + + let timestamp = DateTime::from(timestamp); + let done_if_after = DateTime::from(done_if_after); + + tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + + // get the passed prefix or if it is not set use prefix_in_bucket value + let prefix = prefix + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |_e: &_| false; + + let list = backoff::retry( + || async { + Ok(self + .client + .list_object_versions() + .bucket(self.bucket_name.clone()) + .set_prefix(prefix.clone()) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + + if list.is_truncated().unwrap_or_default() { + anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}"); + } + + let mut versions_deletes = list + .versions() + .iter() + .map(VerOrDelete::Version) + .chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker)) + .collect::>(); + + versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified())); + + let mut vds_for_key = HashMap::<_, Vec<_>>::new(); + + for vd in versions_deletes { + let last_modified = vd.last_modified(); + let version_id = vd.version_id(); + let key = vd.key(); + let (Some(last_modified), Some(version_id), Some(key)) = + (last_modified, version_id, key) + else { + anyhow::bail!( + "One (or more) of last_modified, key, and id is None. \ + Is versioning enabled in the bucket? last_modified={:?} key={:?} version_id={:?}", + last_modified, key, version_id, + ); + }; + if version_id == "null" { + anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values"); + } + tracing::trace!( + "Parsing version key={key} version_id={version_id} is_delete={}", + matches!(vd, VerOrDelete::DeleteMarker(_)) + ); + + vds_for_key + .entry(key) + .or_default() + .push((vd, last_modified, version_id)); + } + for (key, versions) in vds_for_key { + let (last_vd, last_last_modified, _version_id) = versions.last().unwrap(); + if last_last_modified > &&done_if_after { + tracing::trace!("Key {key} has version later than done_if_after, skipping"); + continue; + } + // the version we want to restore to. + let version_to_restore_to = + match versions.binary_search_by_key(×tamp, |tpl| *tpl.1) { + Ok(v) => v, + Err(e) => e, + }; + if version_to_restore_to == versions.len() { + tracing::trace!("Key {key} has no changes since timestamp, skipping"); + continue; + } + let mut do_delete = false; + if version_to_restore_to == 0 { + // All versions more recent, so the key didn't exist at the specified time point. + tracing::trace!( + "All {} versions more recent for {key}, deleting", + versions.len() + ); + do_delete = true; + } else { + match &versions[version_to_restore_to - 1] { + (VerOrDelete::Version(_), _last_modified, version_id) => { + tracing::trace!("Copying old version {version_id} for {key}..."); + // Restore the state to the last version by copying + let source_id = + format!("{}/{key}?versionId={version_id}", self.bucket_name); + + backoff::retry( + || async { + Ok(self + .client + .copy_object() + .bucket(self.bucket_name.clone()) + .key(key) + .copy_source(&source_id) + .send() + .await?) + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions for time_travel_recover", + backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")), + ) + .await?; + } + (VerOrDelete::DeleteMarker(_), _last_modified, _version_id) => { + do_delete = true; + } + } + }; + if do_delete { + if matches!(last_vd, VerOrDelete::DeleteMarker(_)) { + // Key has since been deleted (but there was some history), no need to do anything + tracing::trace!("Key {key} already deleted, skipping."); + } else { + tracing::trace!("Deleting {key}..."); + + let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?; + self.delete_oids(kind, &[oid]).await?; + } + } + } + Ok(()) + } } /// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. @@ -650,6 +811,32 @@ fn start_measuring_requests( }) } +enum VerOrDelete<'a> { + Version(&'a ObjectVersion), + DeleteMarker(&'a DeleteMarkerEntry), +} + +impl<'a> VerOrDelete<'a> { + fn last_modified(&self) -> Option<&'a DateTime> { + match self { + VerOrDelete::Version(v) => v.last_modified(), + VerOrDelete::DeleteMarker(v) => v.last_modified(), + } + } + fn version_id(&self) -> Option<&'a str> { + match self { + VerOrDelete::Version(v) => v.version_id(), + VerOrDelete::DeleteMarker(v) => v.version_id(), + } + } + fn key(&self) -> Option<&'a str> { + match self { + VerOrDelete::Version(v) => v.key(), + VerOrDelete::DeleteMarker(v) => v.key(), + } + } +} + #[cfg(test)] mod tests { use camino::Utf8Path; diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/s3_bucket/metrics.rs index 21dde14906..beca755920 100644 --- a/libs/remote_storage/src/s3_bucket/metrics.rs +++ b/libs/remote_storage/src/s3_bucket/metrics.rs @@ -12,6 +12,7 @@ pub(crate) enum RequestKind { Delete = 2, List = 3, Copy = 4, + TimeTravel = 5, } use RequestKind::*; @@ -24,6 +25,7 @@ impl RequestKind { Delete => "delete_object", List => "list_objects", Copy => "copy_object", + TimeTravel => "time_travel_recover", } } const fn as_index(&self) -> usize { @@ -31,7 +33,7 @@ impl RequestKind { } } -pub(super) struct RequestTyped([C; 5]); +pub(super) struct RequestTyped([C; 6]); impl RequestTyped { pub(super) fn get(&self, kind: RequestKind) -> &C { @@ -40,8 +42,8 @@ impl RequestTyped { fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { use RequestKind::*; - let mut it = [Get, Put, Delete, List, Copy].into_iter(); - let arr = std::array::from_fn::(|index| { + let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter(); + let arr = std::array::from_fn::(|index| { let next = it.next().unwrap(); assert_eq!(index, next.as_index()); f(next) diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 90d90163a7..fc4c4b315b 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -5,7 +5,9 @@ use bytes::Bytes; use futures::stream::Stream; use std::collections::HashMap; use std::sync::Mutex; +use std::time::SystemTime; use std::{collections::hash_map::Entry, sync::Arc}; +use tokio_util::sync::CancellationToken; use crate::{ Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage, @@ -30,6 +32,7 @@ enum RemoteOp { Download(RemotePath), Delete(RemotePath), DeleteObjects(Vec), + TimeTravelRecover(Option), } impl UnreliableWrapper { @@ -181,4 +184,17 @@ impl RemoteStorage for UnreliableWrapper { self.attempt(RemoteOp::Upload(to.clone()))?; self.inner.copy_object(from, to).await } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: CancellationToken, + ) -> anyhow::Result<()> { + self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?; + self.inner + .time_travel_recover(prefix, timestamp, done_if_after, cancel) + .await + } } diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 4a999d115e..9e1b989e4d 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -1,15 +1,19 @@ -use std::collections::HashSet; use std::env; use std::num::NonZeroUsize; use std::ops::ControlFlow; use std::sync::Arc; -use std::time::UNIX_EPOCH; +use std::time::{Duration, UNIX_EPOCH}; +use std::{collections::HashSet, time::SystemTime}; +use crate::common::{download_to_vec, upload_stream}; use anyhow::Context; +use camino::Utf8Path; use remote_storage::{ GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config, }; +use test_context::test_context; use test_context::AsyncTestContext; +use tokio_util::sync::CancellationToken; use tracing::info; mod common; @@ -23,6 +27,121 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_ const BASE_PREFIX: &str = "test"; +#[test_context(MaybeEnabledStorage)] +#[tokio::test] +async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledStorage::Enabled(ctx) => ctx, + MaybeEnabledStorage::Disabled => return Ok(()), + }; + // Our test depends on discrepancies in the clock between S3 and the environment the tests + // run in. Therefore, wait a little bit before and after. The alternative would be + // to take the time from S3 response headers. + const WAIT_TIME: Duration = Duration::from_millis(3_000); + + async fn time_point() -> SystemTime { + tokio::time::sleep(WAIT_TIME).await; + let ret = SystemTime::now(); + tokio::time::sleep(WAIT_TIME).await; + ret + } + + async fn list_files(client: &Arc) -> anyhow::Result> { + Ok(client + .list_files(None) + .await + .context("list root files failure")? + .into_iter() + .collect::>()) + } + + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let (data, len) = upload_stream("remote blob data1".as_bytes().into()); + ctx.client.upload(data, len, &path1, None).await?; + + let t0_files = list_files(&ctx.client).await?; + let t0 = time_point().await; + println!("at t0: {t0_files:?}"); + + let old_data = "remote blob data2"; + let (data, len) = upload_stream(old_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + let t1_files = list_files(&ctx.client).await?; + let t1 = time_point().await; + println!("at t1: {t1_files:?}"); + + // A little check to ensure that our clock is not too far off from the S3 clock + { + let dl = ctx.client.download(&path2).await?; + let last_modified = dl.last_modified.unwrap(); + let half_wt = WAIT_TIME.mul_f32(0.5); + let t0_hwt = t0 + half_wt; + let t1_hwt = t1 - half_wt; + if !(t0_hwt..=t1_hwt).contains(&last_modified) { + panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \ + This likely means a large lock discrepancy between S3 and the local clock."); + } + } + + let (data, len) = upload_stream("remote blob data3".as_bytes().into()); + ctx.client.upload(data, len, &path3, None).await?; + + let new_data = "new remote blob data2"; + let (data, len) = upload_stream(new_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None).await?; + + ctx.client.delete(&path1).await?; + + let t2_files = list_files(&ctx.client).await?; + let t2 = time_point().await; + println!("at t2: {t2_files:?}"); + + // No changes after recovery to t2 (no-op) + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t2, t_final, CancellationToken::new()) + .await?; + let t2_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t2: {t2_files_recovered:?}"); + assert_eq!(t2_files, t2_files_recovered); + let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?; + assert_eq!(path2_recovered_t2, new_data.as_bytes()); + + // after recovery to t1: path1 is back, path2 has the old content + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t1, t_final, CancellationToken::new()) + .await?; + let t1_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t1: {t1_files_recovered:?}"); + assert_eq!(t1_files, t1_files_recovered); + let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?; + assert_eq!(path2_recovered_t1, old_data.as_bytes()); + + // after recovery to t0: everything is gone except for path1 + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t0, t_final, CancellationToken::new()) + .await?; + let t0_files_recovered = list_files(&ctx.client).await?; + println!("after recovery to t0: {t0_files_recovered:?}"); + assert_eq!(t0_files, t0_files_recovered); + + // cleanup + ctx.client.delete_objects(&[path1, path2, path3]).await?; + + Ok(()) +} + struct EnabledS3 { client: Arc, base_prefix: &'static str, From fd4cce9417484a9cede46a43e14cffb2c32748be Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 25 Jan 2024 19:17:53 +0100 Subject: [PATCH 3/5] test_pageserver_max_throughput_getpage_at_latest_lsn: remove n_tenants=100 combination (#6477) Need to fix the neon_local timeouts first (https://github.com/neondatabase/neon/issues/6473) and also not run them on every merge, but only nightly: https://github.com/neondatabase/neon/issues/6476 --- .../test_pageserver_max_throughput_getpage_at_latest_lsn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py index f1f5511453..1ed7e577b9 100644 --- a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py +++ b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py @@ -29,7 +29,7 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking # 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6 @pytest.mark.parametrize("duration", [30]) @pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]]) -@pytest.mark.parametrize("n_tenants", [1, 10, 100]) +@pytest.mark.parametrize("n_tenants", [1, 10]) @pytest.mark.timeout( 10000 ) # TODO: this value is just "a really high number"; have this per instance type From 689ad72e92955c8e7f2c3e640b0ae6299fbb9276 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 25 Jan 2024 19:20:02 +0100 Subject: [PATCH 4/5] fix(neon_local): leaks child process if it fails to start & pass checks (#6474) refs https://github.com/neondatabase/neon/issues/6473 Before this PR, if process_started() didn't return Ok(true) until we ran out of retries, we'd return an error but leave the process running. Try it by adding a 20s sleep to the pageserver `main()`, e.g., right before we claim the pidfile. Without this PR, output looks like so: ``` (.venv) cs@devvm-mbp:[~/src/neon-work-2]: ./target/debug/neon_local start Starting neon broker at 127.0.0.1:50051. storage_broker started, pid: 2710939 . attachment_service started, pid: 2710949 Starting pageserver node 1 at '127.0.0.1:64000' in ".neon/pageserver_1"..... pageserver has not started yet, continuing to wait..... pageserver 1 start failed: pageserver did not start in 10 seconds No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: ".neon/pageserver_1/pageserver.pid" No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: ".neon/safekeepers/sk1/safekeeper.pid" Stopping storage_broker with pid 2710939 immediately....... storage_broker has not stopped yet, continuing to wait..... neon broker stop failed: storage_broker with pid 2710939 did not stop in 10 seconds Stopping attachment_service with pid 2710949 immediately....... attachment_service has not stopped yet, continuing to wait..... attachment service stop failed: attachment_service with pid 2710949 did not stop in 10 seconds ``` and we leak the pageserver process ``` (.venv) cs@devvm-mbp:[~/src/neon-work-2]: ps aux | grep pageserver cs 2710959 0.0 0.2 2377960 47616 pts/4 Sl 14:36 0:00 /home/cs/src/neon-work-2/target/debug/pageserver -D .neon/pageserver_1 -c id=1 -c pg_distrib_dir='/home/cs/src/neon-work-2/pg_install' -c http_auth_type='Trust' -c pg_auth_type='Trust' -c listen_http_addr='127.0.0.1:9898' -c listen_pg_addr='127.0.0.1:64000' -c broker_endpoint='http://127.0.0.1:50051/' -c control_plane_api='http://127.0.0.1:1234/' -c remote_storage={local_path='../local_fs_remote_storage/pageserver'} ``` After this PR, there is no leaked process. --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/attachment_service.rs | 5 ++-- control_plane/src/background_process.rs | 33 +++++++++++++++++-------- control_plane/src/pageserver.rs | 6 ++--- control_plane/src/safekeeper.rs | 3 +-- 6 files changed, 32 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2f31192f0..f0e8b6a0ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1342,6 +1342,7 @@ dependencies = [ "regex", "reqwest", "safekeeper_api", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 898ad05add..75e5dcb7f8 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -19,6 +19,7 @@ hex.workspace = true hyper.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } +scopeguard.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 0a353d8b12..2d43c46270 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -9,7 +9,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use postgres_connection::parse_host_port; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{path::PathBuf, process::Child, str::FromStr}; +use std::{path::PathBuf, str::FromStr}; use tracing::instrument; use utils::{ auth::{Claims, Scope}, @@ -220,7 +220,7 @@ impl AttachmentService { .expect("non-Unicode path") } - pub async fn start(&self) -> anyhow::Result { + pub async fn start(&self) -> anyhow::Result<()> { let path_str = self.path.to_string_lossy(); let mut args = vec!["-l", &self.listen, "-p", &path_str] @@ -254,6 +254,7 @@ impl AttachmentService { ) .await; + // TODO: shouldn't we bail if we fail to spawn the process? for ps_conf in &self.env.pageservers { let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 20fa3af9b8..3ffb8734d0 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -17,7 +17,7 @@ use std::io::Write; use std::os::unix::prelude::AsRawFd; use std::os::unix::process::CommandExt; use std::path::Path; -use std::process::{Child, Command}; +use std::process::Command; use std::time::Duration; use std::{fs, io, thread}; @@ -60,7 +60,7 @@ pub async fn start_process( envs: EI, initial_pid_file: InitialPidFile, process_status_check: F, -) -> anyhow::Result +) -> anyhow::Result<()> where F: Fn() -> Fut, Fut: std::future::Future>, @@ -98,7 +98,7 @@ where InitialPidFile::Expect(path) => path, }; - let mut spawned_process = filled_cmd.spawn().with_context(|| { + let spawned_process = filled_cmd.spawn().with_context(|| { format!("Could not spawn {process_name}, see console output and log files for details.") })?; let pid = spawned_process.id(); @@ -106,12 +106,26 @@ where i32::try_from(pid) .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?, ); + // set up a scopeguard to kill & wait for the child in case we panic or bail below + let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| { + println!("SIGKILL & wait the started process"); + (|| { + // TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo). + spawned_process.kill().context("SIGKILL child")?; + spawned_process.wait().context("wait() for child process")?; + anyhow::Ok(()) + })() + .with_context(|| format!("scopeguard kill&wait child {process_name:?}")) + .unwrap(); + }); for retries in 0..RETRIES { match process_started(pid, pid_file_to_check, &process_status_check).await { Ok(true) => { - println!("\n{process_name} started, pid: {pid}"); - return Ok(spawned_process); + println!("\n{process_name} started and passed status check, pid: {pid}"); + // leak the child process, it'll outlive this neon_local invocation + drop(scopeguard::ScopeGuard::into_inner(spawned_process)); + return Ok(()); } Ok(false) => { if retries == NOTICE_AFTER_RETRIES { @@ -126,16 +140,15 @@ where thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS)); } Err(e) => { - println!("{process_name} failed to start: {e:#}"); - if let Err(e) = spawned_process.kill() { - println!("Could not stop {process_name} subprocess: {e:#}") - }; + println!("error starting process {process_name:?}: {e:#}"); return Err(e); } } } println!(); - anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds"); + anyhow::bail!( + "{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds" + ); } /// Stops the process, using the pid file given. Returns Ok also if the process is already not running. diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 18ccf6bd98..1db21c9a37 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -11,7 +11,7 @@ use std::io; use std::io::Write; use std::num::NonZeroU64; use std::path::PathBuf; -use std::process::{Child, Command}; +use std::process::Command; use std::time::Duration; use anyhow::{bail, Context}; @@ -161,7 +161,7 @@ impl PageServerNode { .expect("non-Unicode path") } - pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result { + pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { self.start_node(config_overrides, false).await } @@ -207,7 +207,7 @@ impl PageServerNode { &self, config_overrides: &[&str], update_config: bool, - ) -> anyhow::Result { + ) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); print!( diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 4026ef0eb9..6ac71dfe51 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -7,7 +7,6 @@ //! ``` use std::io::Write; use std::path::PathBuf; -use std::process::Child; use std::{io, result}; use anyhow::Context; @@ -104,7 +103,7 @@ impl SafekeeperNode { .expect("non-Unicode path") } - pub async fn start(&self, extra_opts: Vec) -> anyhow::Result { + pub async fn start(&self, extra_opts: Vec) -> anyhow::Result<()> { print!( "Starting safekeeper at '{}' in '{}'", self.pg_connection_config.raw_address(), From d36623ad74c1d4a974d95cff00a8f463ba615254 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 25 Jan 2024 19:25:29 +0000 Subject: [PATCH 5/5] CI: cancel old e2e-tests on new commits (#6463) ## Problem Triggered `e2e-tests` job is not cancelled along with other jobs in a PR if the PR get new commits. We can improve the situation by setting `concurrency_group` for the remote workflow (https://github.com/neondatabase/cloud/pull/9622 adds `concurrency_group` group input to the remote workflow). Ref https://neondb.slack.com/archives/C059ZC138NR/p1706087124297569 Cloud's part added in https://github.com/neondatabase/cloud/pull/9622 ## Summary of changes - Set `concurrency_group` parameter when triggering `e2e-tests` - At the beginning of a CI pipeline, trigger Cloud's `cancel-previous-in-concurrency-group.yml` workflow which cancels previously triggered e2e-tests --- .github/workflows/build_and_test.yml | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index fb88d4da96..643d24696d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -22,6 +22,8 @@ env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }} NEXTEST_RETRIES: 3 + # A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix + E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }} jobs: check-permissions: @@ -45,6 +47,20 @@ jobs: exit 1 + cancel-previous-e2e-tests: + needs: [ check-permissions ] + if: github.event_name == 'pull_request' + runs-on: ubuntu-latest + + steps: + - name: Cancel previous e2e-tests runs for this PR + env: + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} + run: | + gh workflow --repo neondatabase/cloud \ + run cancel-previous-in-concurrency-group.yml \ + --field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}" + tag: needs: [ check-permissions ] runs-on: [ self-hosted, gen3, small ] @@ -696,7 +712,8 @@ jobs: \"commit_hash\": \"$COMMIT_SHA\", \"remote_repo\": \"${{ github.repository }}\", \"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\", - \"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\" + \"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\", + \"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\" } }"