From 4c54e4b37d11c775609ca276e4e35bb4ca6be8a7 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 15 Jun 2022 17:59:24 +0300 Subject: [PATCH] switch to per-tenant attach/detach download operations of all timelines for one tenant are now grouped together so when attach is invoked pageserver downloads all of them and registers them in a single apply_sync_status_update call so branches can be used safely with attach/detach --- libs/remote_storage/src/lib.rs | 14 +- libs/remote_storage/src/local_fs.rs | 29 ++- libs/remote_storage/src/s3_bucket.rs | 112 +++++++- pageserver/src/http/openapi_spec.yml | 47 +++- pageserver/src/http/routes.rs | 145 ++++++----- pageserver/src/layered_repository.rs | 54 ++-- pageserver/src/repository.rs | 5 +- pageserver/src/storage_sync.rs | 107 ++++---- pageserver/src/storage_sync/download.rs | 109 +++++++- pageserver/src/storage_sync/index.rs | 118 +++++++-- pageserver/src/tenant_mgr.rs | 87 ++++--- pageserver/src/timelines.rs | 2 +- pageserver/src/walreceiver.rs | 7 +- .../batch_others/test_ancestor_branch.py | 13 - test_runner/batch_others/test_detach.py | 49 ++++ test_runner/batch_others/test_normal_work.py | 2 +- .../batch_others/test_remote_storage.py | 10 +- .../batch_others/test_tenant_relocation.py | 246 +++++++++++++----- test_runner/fixtures/neon_fixtures.py | 12 +- 19 files changed, 835 insertions(+), 333 deletions(-) create mode 100644 test_runner/batch_others/test_detach.py diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 6d47d070c1..dec79e4580 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -42,13 +42,19 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; +pub trait RemoteObjectName { + // Needed to retrieve last component for RemoteObjectId. + // In other words a file name + fn object_name(&self) -> Option<&str>; +} + /// Storage (potentially remote) API to manage its state. /// This storage tries to be unaware of any layered repository context, /// providing basic CRUD operations for storage files. #[async_trait::async_trait] pub trait RemoteStorage: Send + Sync { /// A way to uniquely reference a file in the remote storage. - type RemoteObjectId; + type RemoteObjectId: RemoteObjectName; /// Attempts to derive the storage path out of the local path, if the latter is correct. fn remote_object_id(&self, local_path: &Path) -> anyhow::Result; @@ -59,6 +65,12 @@ pub trait RemoteStorage: Send + Sync { /// Lists all items the storage has right now. async fn list(&self) -> anyhow::Result>; + /// Lists all top level subdirectories for a given prefix + async fn list_prefixes( + &self, + prefix: Option, + ) -> anyhow::Result>; + /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 25235200b2..df1581fb51 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -5,6 +5,7 @@ //! volume is mounted to the local FS. use std::{ + borrow::Cow, future::Future, path::{Path, PathBuf}, pin::Pin, @@ -17,10 +18,16 @@ use tokio::{ }; use tracing::*; -use crate::{path_with_suffix_extension, Download, DownloadError}; +use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectName}; use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; +impl RemoteObjectName for PathBuf { + fn object_name(&self) -> Option<&str> { + self.file_stem().and_then(|n| n.to_str()) + } +} + pub struct LocalFs { working_directory: PathBuf, storage_root: PathBuf, @@ -101,7 +108,18 @@ impl RemoteStorage for LocalFs { } async fn list(&self) -> anyhow::Result> { - get_all_files(&self.storage_root).await + get_all_files(&self.storage_root, true).await + } + + async fn list_prefixes( + &self, + prefix: Option, + ) -> anyhow::Result> { + let path = match prefix { + Some(prefix) => Cow::Owned(self.storage_root.join(prefix)), + None => Cow::Borrowed(&self.storage_root), + }; + get_all_files(path.as_ref(), false).await } async fn upload( @@ -299,6 +317,7 @@ fn storage_metadata_path(original_path: &Path) -> PathBuf { fn get_all_files<'a, P>( directory_path: P, + recursive: bool, ) -> Pin>> + Send + Sync + 'a>> where P: AsRef + Send + Sync + 'a, @@ -315,7 +334,11 @@ where if file_type.is_symlink() { debug!("{:?} us a symlink, skipping", entry_path) } else if file_type.is_dir() { - paths.extend(get_all_files(entry_path).await?.into_iter()) + if recursive { + paths.extend(get_all_files(entry_path, true).await?.into_iter()) + } else { + paths.push(dir_entry.path()) + } } else { paths.push(dir_entry.path()); } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 5269d63d09..3b413e30ce 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -19,7 +19,9 @@ use tokio::{io, sync::Semaphore}; use tokio_util::io::ReaderStream; use tracing::debug; -use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config}; +use crate::{ + strip_path_prefix, Download, DownloadError, RemoteObjectName, RemoteStorage, S3Config, +}; use super::StorageMetadata; @@ -117,6 +119,24 @@ impl S3ObjectKey { } } +impl RemoteObjectName for S3ObjectKey { + /// Turn a/b/c or a/b/c/ into c + fn object_name(&self) -> Option<&str> { + // corner case + if &self.0 == "/" { + return None; + } + + if self.0.ends_with(S3_PREFIX_SEPARATOR) { + self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1) + } else { + self.0 + .rsplit_once(S3_PREFIX_SEPARATOR) + .map(|(_, last)| last) + } + } +} + /// AWS S3 storage. pub struct S3Bucket { workdir: PathBuf, @@ -283,6 +303,77 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + /// Note: it wont include empty "directories" + async fn list_prefixes( + &self, + prefix: Option, + ) -> anyhow::Result> { + let list_prefix = match prefix { + Some(prefix) => { + let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default(); + // if there is no trailing / in default prefix and + // supplied prefix does not start with "/" insert it + if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) + || prefix.0.starts_with(S3_PREFIX_SEPARATOR)) + { + prefix_in_bucket.push(S3_PREFIX_SEPARATOR); + } + + prefix_in_bucket.push_str(&prefix.0); + // required to end with a separator + // otherwise request will return only the entry of a prefix + if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) { + prefix_in_bucket.push(S3_PREFIX_SEPARATOR); + } + Some(prefix_in_bucket) + } + None => self.prefix_in_bucket.clone(), + }; + + let mut document_keys = Vec::new(); + + let mut continuation_token = None; + loop { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list")?; + + metrics::inc_list_objects(); + + let fetch_response = self + .client + .list_objects_v2(ListObjectsV2Request { + bucket: self.bucket_name.clone(), + prefix: list_prefix.clone(), + continuation_token, + delimiter: Some(S3_PREFIX_SEPARATOR.to_string()), + ..ListObjectsV2Request::default() + }) + .await + .map_err(|e| { + metrics::inc_list_objects_fail(); + e + })?; + + document_keys.extend( + fetch_response + .common_prefixes + .unwrap_or_default() + .into_iter() + .filter_map(|o| Some(S3ObjectKey(o.prefix?))), + ); + + match fetch_response.continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + + Ok(document_keys) + } + async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, @@ -378,6 +469,25 @@ mod tests { use super::*; + #[test] + fn object_name() { + let k = S3ObjectKey("a/b/c".to_owned()); + assert_eq!(k.object_name(), Some("c")); + + let k = S3ObjectKey("a/b/c/".to_owned()); + assert_eq!(k.object_name(), Some("c")); + + let k = S3ObjectKey("a/".to_owned()); + assert_eq!(k.object_name(), Some("a")); + + // XXX is it impossible to have an empty key? + let k = S3ObjectKey("".to_owned()); + assert_eq!(k.object_name(), None); + + let k = S3ObjectKey("/".to_owned()); + assert_eq!(k.object_name(), None); + } + #[test] fn download_destination() -> anyhow::Result<()> { let workdir = tempdir()?.path().to_owned(); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 55f7b3c5a7..ebbb0d5ced 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -170,7 +170,6 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" - /v1/tenant/{tenant_id}/timeline/{timeline_id}/attach: parameters: - name: tenant_id @@ -186,12 +185,27 @@ paths: type: string format: hex post: - description: Attach remote timeline + description: Deprecated responses: - "200": - description: Timeline attaching scheduled + "410": + description: GONE + + + /v1/tenant/{tenant_id}/attach: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + post: + description: Deprecated + responses: + "202": + description: Tenant attaching scheduled "400": - description: Error when no tenant id found in path or no timeline id + description: Error when no tenant id found in path parameters content: application/json: schema: @@ -215,7 +229,7 @@ paths: schema: $ref: "#/components/schemas/NotFoundError" "409": - description: Timeline download is already in progress + description: Tenant download is already in progress content: application/json: schema: @@ -227,7 +241,6 @@ paths: schema: $ref: "#/components/schemas/Error" - /v1/tenant/{tenant_id}/timeline/{timeline_id}/detach: parameters: - name: tenant_id @@ -243,12 +256,26 @@ paths: type: string format: hex post: - description: Detach local timeline + description: Deprecated + responses: + "410": + description: GONE + + /v1/tenant/{tenant_id}/detach: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + post: + description: Detach local tenant responses: "200": - description: Timeline detached + description: Tenant detached "400": - description: Error when no tenant id found in path or no timeline id + description: Error when no tenant id found in path parameters content: application/json: schema: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a1198051a8..41c78210f4 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -209,9 +209,9 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result) -> Result, ApiError> { +async fn timeline_attach_handler(_: Request) -> Result, ApiError> { + json_response(StatusCode::GONE, ()) +} + +// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create +async fn tenant_attach_handler(request: Request) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; - info!( - "Handling timeline {} attach for tenant: {}", - timeline_id, tenant_id, - ); + info!("Handling tenant attach {}", tenant_id,); tokio::task::spawn_blocking(move || { - if tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id).is_ok() { - // TODO: maybe answer with 309 Not Modified here? - anyhow::bail!("Timeline is already present locally") + if tenant_mgr::get_tenant_state(tenant_id).is_some() { + anyhow::bail!("Tenant is already present locally") }; Ok(()) }) .await .map_err(ApiError::from_err)??; - let sync_id = ZTenantTimelineId { - tenant_id, - timeline_id, - }; let state = get_state(&request); let remote_index = &state.remote_index; let mut index_accessor = remote_index.write().await; - if let Some(remote_timeline) = index_accessor.timeline_entry_mut(&sync_id) { - if remote_timeline.awaits_download { + if let Some(tenant_entry) = index_accessor.tenant_entry_mut(&tenant_id) { + if tenant_entry.has_in_progress_downloads() { return Err(ApiError::Conflict( - "Timeline download is already in progress".to_string(), + "Tenant download is already in progress".to_string(), )); } - remote_timeline.awaits_download = true; - storage_sync::schedule_layer_download(tenant_id, timeline_id); - return json_response(StatusCode::ACCEPTED, ()); - } else { - // no timeline in the index, release the lock to make the potentially lengthy download opetation - drop(index_accessor); - } - - let new_timeline = match try_download_index_part_data(state, sync_id).await { - Ok(Some(mut new_timeline)) => { - tokio::fs::create_dir_all(state.conf.timeline_path(&timeline_id, &tenant_id)) - .await - .context("Failed to create new timeline directory")?; - new_timeline.awaits_download = true; - new_timeline + for (timeline_id, remote_timeline) in tenant_entry.iter_mut() { + storage_sync::schedule_layer_download(tenant_id, *timeline_id); + remote_timeline.awaits_download = true; } - Ok(None) => return Err(ApiError::NotFound("Unknown remote timeline".to_string())), + return json_response(StatusCode::ACCEPTED, ()); + } + // no tenant in the index, release the lock to make the potentially lengthy download opetation + drop(index_accessor); + + // download index parts for every tenant timeline + let remote_timelines = match try_download_tenant_index(state, tenant_id).await { + Ok(Some(remote_timelines)) => remote_timelines, + Ok(None) => return Err(ApiError::NotFound("Unknown remote tenant".to_string())), Err(e) => { - error!("Failed to retrieve remote timeline data: {:?}", e); + error!("Failed to retrieve remote tenant data: {:?}", e); return Err(ApiError::NotFound( - "Failed to retrieve remote timeline".to_string(), + "Failed to retrieve remote tenant".to_string(), )); } }; + // recheck that download is not in progress because + // we've released the lock to avoid holding it during the download let mut index_accessor = remote_index.write().await; - match index_accessor.timeline_entry_mut(&sync_id) { - Some(remote_timeline) => { - if remote_timeline.awaits_download { + let tenant_entry = match index_accessor.tenant_entry_mut(&tenant_id) { + Some(tenant_entry) => { + if tenant_entry.has_in_progress_downloads() { return Err(ApiError::Conflict( - "Timeline download is already in progress".to_string(), + "Tenant download is already in progress".to_string(), )); } - remote_timeline.awaits_download = true; + tenant_entry } - None => index_accessor.add_timeline_entry(sync_id, new_timeline), + None => index_accessor.add_tenant_entry(tenant_id), + }; + + // populate remote index with the data from index part and create directories on the local filesystem + for (timeline_id, mut remote_timeline) in remote_timelines { + tokio::fs::create_dir_all(state.conf.timeline_path(&timeline_id, &tenant_id)) + .await + .context("Failed to create new timeline directory")?; + + remote_timeline.awaits_download = true; + tenant_entry.insert(timeline_id, remote_timeline); + // schedule actual download + storage_sync::schedule_layer_download(tenant_id, timeline_id); } - storage_sync::schedule_layer_download(tenant_id, timeline_id); + json_response(StatusCode::ACCEPTED, ()) } -async fn try_download_index_part_data( +async fn try_download_tenant_index( state: &State, - sync_id: ZTenantTimelineId, -) -> anyhow::Result> { - let index_part = match state.remote_storage.as_ref() { + tenant_id: ZTenantId, +) -> anyhow::Result>> { + let index_parts = match state.remote_storage.as_ref() { Some(GenericRemoteStorage::Local(local_storage)) => { - storage_sync::download_index_part(state.conf, local_storage, sync_id).await + storage_sync::download_tenant_index_parts(state.conf, local_storage, tenant_id).await } + // FIXME here s3 storage contains its own limits, that are separate from sync storage thread ones + // because it is a different instance. We can move this limit to some global static + // or use one instance everywhere. Some(GenericRemoteStorage::S3(s3_storage)) => { - storage_sync::download_index_part(state.conf, s3_storage, sync_id).await + storage_sync::download_tenant_index_parts(state.conf, s3_storage, tenant_id).await } None => return Ok(None), } - .with_context(|| format!("Failed to download index part for timeline {sync_id}"))?; + .with_context(|| format!("Failed to download index parts for tenant {tenant_id}"))?; - let timeline_path = state - .conf - .timeline_path(&sync_id.timeline_id, &sync_id.tenant_id); - RemoteTimeline::from_index_part(&timeline_path, index_part) - .map(Some) - .with_context(|| { - format!("Failed to convert index part into remote timeline for timeline {sync_id}") - }) + let mut remote_timelines = Vec::with_capacity(index_parts.len()); + for (timeline_id, index_part) in index_parts { + let timeline_path = state.conf.timeline_path(&timeline_id, &tenant_id); + let remote_timeline = RemoteTimeline::from_index_part(&timeline_path, index_part) + .with_context(|| { + format!("Failed to convert index part into remote timeline for timeline {tenant_id}/{timeline_id}") + })?; + remote_timelines.push((timeline_id, remote_timeline)); + } + Ok(Some(remote_timelines)) } -async fn timeline_detach_handler(request: Request) -> Result, ApiError> { +async fn timeline_detach_handler(_: Request) -> Result, ApiError> { + json_response(StatusCode::GONE, ()) +} + +async fn tenant_detach_handler(request: Request) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; - tokio::task::spawn_blocking(move || { - let _enter = - info_span!("timeline_detach_handler", tenant = %tenant_id, timeline = %timeline_id) - .entered(); + let _enter = info_span!("tenant_detach_handler", tenant = %tenant_id).entered(); let state = get_state(&request); - tenant_mgr::detach_timeline(state.conf, tenant_id, timeline_id) + tenant_mgr::detach_tenant(state.conf, tenant_id) }) .await .map_err(ApiError::from_err)??; @@ -523,6 +534,8 @@ pub fn make_router( .put("/v1/tenant/config", tenant_config_handler) .get("/v1/tenant/:tenant_id/timeline", timeline_list_handler) .post("/v1/tenant/:tenant_id/timeline", timeline_create_handler) + .post("/v1/tenant/:tenant_id/attach", tenant_attach_handler) + .post("/v1/tenant/:tenant_id/detach", tenant_detach_handler) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id", timeline_detail_handler, diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index db5b77a4d9..2369f46c4f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -331,19 +331,19 @@ impl Repository for LayeredRepository { /// metrics collection. fn gc_iteration( &self, - target_timelineid: Option, + target_timeline_id: Option, horizon: u64, pitr: Duration, checkpoint_before_gc: bool, ) -> Result { - let timeline_str = target_timelineid + let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); STORAGE_TIME .with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str]) .observe_closure_duration(|| { - self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc) + self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc) }) } @@ -410,28 +410,6 @@ impl Repository for LayeredRepository { Ok(()) } - fn detach_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> { - let mut timelines = self.timelines.lock().unwrap(); - // check no child timelines, because detach will remove files, which will brake child branches - // FIXME this can still be violated because we do not guarantee - // that all ancestors are downloaded/attached to the same pageserver - let num_children = timelines - .iter() - .filter(|(_, entry)| entry.ancestor_timeline_id() == Some(timeline_id)) - .count(); - - ensure!( - num_children == 0, - "Cannot detach timeline which has child timelines" - ); - - ensure!( - timelines.remove(&timeline_id).is_some(), - "Cannot detach timeline {timeline_id} that is not available locally" - ); - Ok(()) - } - fn apply_timeline_remote_sync_status_update( &self, timeline_id: ZTimelineId, @@ -839,13 +817,13 @@ impl LayeredRepository { // we do. fn gc_iteration_internal( &self, - target_timelineid: Option, + target_timeline_id: Option, horizon: u64, pitr: Duration, checkpoint_before_gc: bool, ) -> Result { let _span_guard = - info_span!("gc iteration", tenant = %self.tenant_id, timeline = ?target_timelineid) + info_span!("gc iteration", tenant = %self.tenant_id, timeline = ?target_timeline_id) .entered(); let mut totals: GcResult = Default::default(); let now = Instant::now(); @@ -859,6 +837,12 @@ impl LayeredRepository { let mut timeline_ids = Vec::new(); let mut timelines = self.timelines.lock().unwrap(); + if let Some(target_timeline_id) = target_timeline_id.as_ref() { + if timelines.get(target_timeline_id).is_none() { + bail!("gc target timeline does not exist") + } + }; + for (timeline_id, timeline_entry) in timelines.iter() { timeline_ids.push(*timeline_id); @@ -867,7 +851,7 @@ impl LayeredRepository { // Somewhat related: https://github.com/zenithdb/zenith/issues/999 if let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id() { // If target_timeline is specified, we only need to know branchpoints of its children - if let Some(timelineid) = target_timelineid { + if let Some(timelineid) = target_timeline_id { if ancestor_timeline_id == &timelineid { all_branchpoints .insert((*ancestor_timeline_id, timeline_entry.ancestor_lsn())); @@ -882,7 +866,7 @@ impl LayeredRepository { // Ok, we now know all the branch points. // Perform GC for each timeline. - for timelineid in timeline_ids.into_iter() { + for timeline_id in timeline_ids.into_iter() { if thread_mgr::is_shutdown_requested() { // We were requested to shut down. Stop and return with the progress we // made. @@ -891,12 +875,12 @@ impl LayeredRepository { // Timeline is known to be local and loaded. let timeline = self - .get_timeline_load_internal(timelineid, &mut *timelines)? + .get_timeline_load_internal(timeline_id, &mut *timelines)? .expect("checked above that timeline is local and loaded"); // If target_timeline is specified, only GC it - if let Some(target_timelineid) = target_timelineid { - if timelineid != target_timelineid { + if let Some(target_timelineid) = target_timeline_id { + if timeline_id != target_timelineid { continue; } } @@ -905,8 +889,8 @@ impl LayeredRepository { drop(timelines); let branchpoints: Vec = all_branchpoints .range(( - Included((timelineid, Lsn(0))), - Included((timelineid, Lsn(u64::MAX))), + Included((timeline_id, Lsn(0))), + Included((timeline_id, Lsn(u64::MAX))), )) .map(|&x| x.1) .collect(); @@ -916,7 +900,7 @@ impl LayeredRepository { // used in tests, so we want as deterministic results as possible. if checkpoint_before_gc { timeline.checkpoint(CheckpointConfig::Forced)?; - info!("timeline {} checkpoint_before_gc done", timelineid); + info!("timeline {} checkpoint_before_gc done", timeline_id); } timeline.update_gc_info(branchpoints, cutoff, pitr); let result = timeline.gc()?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 9501a416b4..f9ea4a6ff8 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -260,9 +260,6 @@ pub trait Repository: Send + Sync { /// api's 'compact' command. fn compaction_iteration(&self) -> Result<()>; - /// detaches timeline-related in-memory data. - fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; - // Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. fn get_remote_index(&self) -> &RemoteIndex; } @@ -537,7 +534,7 @@ pub mod repo_harness { TenantConfOpt::from(self.tenant_conf), walredo_mgr, self.tenant_id, - RemoteIndex::empty(), + RemoteIndex::default(), false, ); // populate repo with locally available timelines diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 5fe2cde3b7..c52da95945 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -192,6 +192,8 @@ use metrics::{ use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; pub use self::download::download_index_part; +pub use self::download::download_tenant_index_parts; +pub use self::download::try_download_index_parts; pub use self::download::TEMP_DOWNLOAD_EXTENSION; lazy_static! { @@ -301,7 +303,7 @@ pub fn start_local_timeline_sync( } Ok(SyncStartupData { local_timeline_init_statuses, - remote_index: RemoteIndex::empty(), + remote_index: RemoteIndex::default(), }) } } @@ -835,7 +837,7 @@ where .build() .context("Failed to create storage sync runtime")?; - let applicable_index_parts = runtime.block_on(try_fetch_index_parts( + let applicable_index_parts = runtime.block_on(try_download_index_parts( conf, &storage, local_timeline_files.keys().copied().collect(), @@ -918,16 +920,59 @@ fn storage_sync_loop( }); match loop_step { - ControlFlow::Continue(new_timeline_states) => { - if new_timeline_states.is_empty() { - debug!("Sync loop step completed, no new timeline states"); + ControlFlow::Continue(updated_tenants) => { + if updated_tenants.is_empty() { + debug!("Sync loop step completed, no new tenant states"); } else { info!( - "Sync loop step completed, {} new timeline state update(s)", - new_timeline_states.len() + "Sync loop step completed, {} new tenant state update(s)", + updated_tenants.len() ); - // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - apply_timeline_sync_status_updates(conf, &index, new_timeline_states); + let index_accessor = runtime.block_on(index.write()); + for tenant_id in updated_tenants { + let tenant_entry = match index_accessor.tenant_entry(&tenant_id) { + Some(tenant_entry) => tenant_entry, + None => { + error!( + "cannot find tenant in remote index for timeline sync update" + ); + continue; + } + }; + + if tenant_entry.has_in_progress_downloads() { + info!("Tenant {tenant_id} has pending timeline downloads, skipping repository registration"); + continue; + } else { + info!( + "Tenant {tenant_id} download completed. Registering in repository" + ); + // Here we assume that if tenant has no in-progress downloads that + // means that it is the last completed timeline download that triggered + // sync status update. So we look at the index for available timelines + // and register them all at once in a repository for download + // to be submitted in a single operation to repository + // so it can apply them at once to internal timeline map. + let sync_status_updates: HashMap< + ZTimelineId, + TimelineSyncStatusUpdate, + > = tenant_entry + .keys() + .copied() + .map(|timeline_id| { + (timeline_id, TimelineSyncStatusUpdate::Downloaded) + }) + .collect(); + + // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. + apply_timeline_sync_status_updates( + conf, + &index, + tenant_id, + sync_status_updates, + ); + } + } } } ControlFlow::Break(()) => { @@ -945,7 +990,7 @@ async fn process_batches( index: &RemoteIndex, batched_tasks: HashMap, sync_queue: &SyncQueue, -) -> HashMap> +) -> HashSet where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, @@ -970,18 +1015,13 @@ where }) .collect::>(); - let mut new_timeline_states: HashMap< - ZTenantId, - HashMap, - > = HashMap::new(); + let mut new_timeline_states = HashSet::new(); + // we purposely ignore actual state update, because we're waiting for last timeline download to happen while let Some((sync_id, state_update)) = sync_results.next().await { debug!("Finished storage sync task for sync id {sync_id}"); - if let Some(state_update) = state_update { - new_timeline_states - .entry(sync_id.tenant_id) - .or_default() - .insert(sync_id.timeline_id, state_update); + if state_update.is_some() { + new_timeline_states.insert(sync_id.tenant_id); } } @@ -1458,35 +1498,6 @@ async fn validate_task_retries( ControlFlow::Continue(sync_data) } -async fn try_fetch_index_parts( - conf: &'static PageServerConf, - storage: &S, - keys: HashSet, -) -> HashMap -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ - let mut index_parts = HashMap::with_capacity(keys.len()); - - let mut part_downloads = keys - .into_iter() - .map(|id| async move { (id, download_index_part(conf, storage, id).await) }) - .collect::>(); - - while let Some((id, part_upload_result)) = part_downloads.next().await { - match part_upload_result { - Ok(index_part) => { - debug!("Successfully fetched index part for {id}"); - index_parts.insert(id, index_part); - } - Err(e) => warn!("Failed to fetch index part for {id}: {e}"), - } - } - - index_parts -} - fn schedule_first_sync_tasks( index: &mut RemoteTimelineIndex, sync_queue: &SyncQueue, diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index b51826fa1e..8cb9906e33 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -1,10 +1,14 @@ //! Timeline synchronization logic to fetch the layer files from remote storage into pageserver's local directory. -use std::{collections::HashSet, fmt::Debug, path::Path}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + path::Path, +}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; -use remote_storage::{path_with_suffix_extension, RemoteStorage}; +use remote_storage::{path_with_suffix_extension, RemoteObjectName, RemoteStorage}; use tokio::{ fs, io::{self, AsyncWriteExt}, @@ -14,7 +18,7 @@ use tracing::{debug, error, info, warn}; use crate::{ config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, }; -use utils::zid::ZTenantTimelineId; +use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use super::{ index::{IndexPart, RemoteTimeline}, @@ -23,6 +27,105 @@ use super::{ pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; +/// FIXME: Needs cleanup. Currently it swallows errors. Here we need to ensure that +/// we successfully downloaded all metadata parts for one tenant. +/// And successful includes absence of index_part in the remote. Because it is valid situation +/// when timeline was just created and pageserver restarted before upload of index part was completed. +/// But currently RemoteStorage interface does not provide this knowledge because it uses +/// anyhow::Error as an error type. So this needs a refactoring. +/// +/// In other words we need to yield only complete sets of tenant timelines. +/// Failure for one timeline of a tenant should exclude whole tenant from returned hashmap. +/// So there are two requirements: keep everything in one futures unordered +/// to allow higher concurrency. Mark tenants as failed independently. +/// That requires some bookeeping. +pub async fn try_download_index_parts( + conf: &'static PageServerConf, + storage: &S, + keys: HashSet, +) -> HashMap> +where + P: Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + let mut index_parts: HashMap> = HashMap::new(); + + let mut part_downloads = keys + .into_iter() + .map(|id| async move { (id, download_index_part(conf, storage, id).await) }) + .collect::>(); + + while let Some((id, part_upload_result)) = part_downloads.next().await { + match part_upload_result { + Ok(index_part) => { + debug!("Successfully fetched index part for {id}"); + index_parts + .entry(id.tenant_id) + .or_default() + .insert(id.timeline_id, index_part); + } + Err(e) => error!("Failed to fetch index part for {id}: {e}"), + } + } + + index_parts +} + +pub async fn download_tenant_index_parts( + conf: &'static PageServerConf, + storage: &S, + tenant_id: ZTenantId, +) -> anyhow::Result> +where + P: RemoteObjectName + Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + let tenant_path = conf.timelines_path(&tenant_id); + let tenant_storage_path = storage.remote_object_id(&tenant_path).with_context(|| { + format!( + "Failed to get tenant storage path for local path '{}'", + tenant_path.display() + ) + })?; + let timelines = storage + .list_prefixes(Some(tenant_storage_path)) + .await + .with_context(|| { + format!( + "Failed to list tenant storage path to get remote timelines to download: {}", + tenant_id + ) + })?; + + let mut sync_ids = HashSet::new(); + + for timeline_remote_storage_key in timelines { + let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| { + anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}") + })?; + + let timeline_id: ZTimelineId = object_name + .parse() + .with_context(|| { + format!("failed to parse object name into timeline id for tenant {tenant_id} '{object_name}'") + })?; + + sync_ids.insert(ZTenantTimelineId { + tenant_id, + timeline_id, + }); + } + + let index_parts = try_download_index_parts(conf, storage, sync_ids) + .await + .remove(&tenant_id) + .ok_or(anyhow::anyhow!( + "Missing tenant index parts. This is a bug." + ))?; + + Ok(index_parts) +} + /// Retrieves index data from the remote storage for a given timeline. pub async fn download_index_part( conf: &'static PageServerConf, diff --git a/pageserver/src/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs index 2ba48ddf53..8bc9f6f189 100644 --- a/pageserver/src/storage_sync/index.rs +++ b/pageserver/src/storage_sync/index.rs @@ -2,6 +2,7 @@ //! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about //! remote timeline layers and its metadata. +use std::ops::{Deref, DerefMut}; use std::{ collections::{HashMap, HashSet}, path::{Path, PathBuf}, @@ -14,7 +15,10 @@ use serde_with::{serde_as, DisplayFromStr}; use tokio::sync::RwLock; use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata}; -use utils::{lsn::Lsn, zid::ZTenantTimelineId}; +use utils::{ + lsn::Lsn, + zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}, +}; /// A part of the filesystem path, that needs a root to become a path again. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -41,38 +45,68 @@ impl RelativePath { } } +#[derive(Debug, Clone, Default)] +pub struct TenantEntry(HashMap); + +impl TenantEntry { + pub fn has_in_progress_downloads(&self) -> bool { + self.values() + .any(|remote_timeline| remote_timeline.awaits_download) + } +} + +impl Deref for TenantEntry { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for TenantEntry { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From> for TenantEntry { + fn from(inner: HashMap) -> Self { + Self(inner) + } +} + /// An index to track tenant files that exist on the remote storage. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct RemoteTimelineIndex { - timeline_entries: HashMap, + entries: HashMap, } /// A wrapper to synchronize the access to the index, should be created and used before dealing with any [`RemoteTimelineIndex`]. +#[derive(Default)] pub struct RemoteIndex(Arc>); impl RemoteIndex { - pub fn empty() -> Self { - Self(Arc::new(RwLock::new(RemoteTimelineIndex { - timeline_entries: HashMap::new(), - }))) - } - pub fn from_parts( conf: &'static PageServerConf, - index_parts: HashMap, + index_parts: HashMap>, ) -> anyhow::Result { - let mut timeline_entries = HashMap::new(); + let mut entries: HashMap = HashMap::new(); - for (sync_id, index_part) in index_parts { - let timeline_path = conf.timeline_path(&sync_id.timeline_id, &sync_id.tenant_id); - let remote_timeline = RemoteTimeline::from_index_part(&timeline_path, index_part) - .context("Failed to restore remote timeline data from index part")?; - timeline_entries.insert(sync_id, remote_timeline); + for (tenant_id, timelines) in index_parts { + for (timeline_id, index_part) in timelines { + let timeline_path = conf.timeline_path(&timeline_id, &tenant_id); + let remote_timeline = + RemoteTimeline::from_index_part(&timeline_path, index_part) + .context("Failed to restore remote timeline data from index part")?; + + entries + .entry(tenant_id) + .or_default() + .insert(timeline_id, remote_timeline); + } } - Ok(Self(Arc::new(RwLock::new(RemoteTimelineIndex { - timeline_entries, - })))) + Ok(Self(Arc::new(RwLock::new(RemoteTimelineIndex { entries })))) } pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, RemoteTimelineIndex> { @@ -91,20 +125,50 @@ impl Clone for RemoteIndex { } impl RemoteTimelineIndex { - pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&RemoteTimeline> { - self.timeline_entries.get(id) + pub fn timeline_entry( + &self, + ZTenantTimelineId { + tenant_id, + timeline_id, + }: &ZTenantTimelineId, + ) -> Option<&RemoteTimeline> { + self.entries.get(tenant_id)?.get(timeline_id) } - pub fn timeline_entry_mut(&mut self, id: &ZTenantTimelineId) -> Option<&mut RemoteTimeline> { - self.timeline_entries.get_mut(id) + pub fn timeline_entry_mut( + &mut self, + ZTenantTimelineId { + tenant_id, + timeline_id, + }: &ZTenantTimelineId, + ) -> Option<&mut RemoteTimeline> { + self.entries.get_mut(tenant_id)?.get_mut(timeline_id) } - pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: RemoteTimeline) { - self.timeline_entries.insert(id, entry); + pub fn add_timeline_entry( + &mut self, + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, + entry: RemoteTimeline, + ) { + self.entries + .entry(tenant_id) + .or_default() + .insert(timeline_id, entry); } - pub fn all_sync_ids(&self) -> impl Iterator + '_ { - self.timeline_entries.keys().copied() + pub fn tenant_entry(&self, tenant_id: &ZTenantId) -> Option<&TenantEntry> { + self.entries.get(tenant_id) + } + + pub fn tenant_entry_mut(&mut self, tenant_id: &ZTenantId) -> Option<&mut TenantEntry> { + self.entries.get_mut(tenant_id) + } + + pub fn add_tenant_entry(&mut self, tenant_id: ZTenantId) -> &mut TenantEntry { + self.entries.entry(tenant_id).or_default() } pub fn set_awaits_download( diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c73fed140a..c96dc6973b 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -165,14 +165,14 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result), Attach(ZTenantTimelineId, Arc), } impl std::fmt::Debug for LocalTimelineUpdate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Detach(ttid) => f.debug_tuple("Remove").field(ttid).finish(), + Self::Detach(ttid, _) => f.debug_tuple("Remove").field(ttid).finish(), Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(), } } @@ -182,32 +182,31 @@ impl std::fmt::Debug for LocalTimelineUpdate { pub fn apply_timeline_sync_status_updates( conf: &'static PageServerConf, remote_index: &RemoteIndex, - sync_status_updates: HashMap>, + tenant_id: ZTenantId, + sync_status_updates: HashMap, ) { if sync_status_updates.is_empty() { debug!("no sync status updates to apply"); return; } info!( - "Applying sync status updates for {} timelines", + "Applying sync status updates for tenant {tenant_id} {} timelines", sync_status_updates.len() ); debug!("Sync status updates: {sync_status_updates:?}"); - for (tenant_id, status_updates) in sync_status_updates { - let repo = match load_local_repo(conf, tenant_id, remote_index) { - Ok(repo) => repo, - Err(e) => { - error!("Failed to load repo for tenant {tenant_id} Error: {e:?}",); - continue; - } - }; - match apply_timeline_remote_sync_status_updates(&repo, status_updates) { - Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"), - Err(e) => error!( - "Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}" - ), + let repo = match load_local_repo(conf, tenant_id, remote_index) { + Ok(repo) => repo, + Err(e) => { + error!("Failed to load repo for tenant {tenant_id} Error: {e:?}"); + return; } + }; + match apply_timeline_remote_sync_status_updates(&repo, sync_status_updates) { + Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"), + Err(e) => error!( + "Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}" + ), } } @@ -387,29 +386,49 @@ pub fn get_local_timeline_with_load( } } -pub fn detach_timeline( - conf: &'static PageServerConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, -) -> anyhow::Result<()> { - // shutdown the timeline threads (this shuts down the walreceiver) - thread_mgr::shutdown_threads(None, Some(tenant_id), Some(timeline_id)); +pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> anyhow::Result<()> { + set_tenant_state(tenant_id, TenantState::Stopping)?; + // shutdown the tenant and timeline threads: gc, compaction, page service threads) + thread_mgr::shutdown_threads(None, Some(tenant_id), None); - match tenants_state::write_tenants().get_mut(&tenant_id) { - Some(tenant) => { - tenant - .repo - .detach_timeline(timeline_id) - .context("Failed to detach inmem tenant timeline")?; - tenant.local_timelines.remove(&timeline_id); + // FIXME should we protect somehow from starting new threads/walreceivers when tenant is in stopping state? + // send stop signal to wal receiver and collect join handles while holding the lock + let walreceiver_join_handles = { + let tenants = tenants_state::write_tenants(); + let tenant = tenants.get(&tenant_id).context("tenant not found")?; + let mut walreceiver_join_handles = Vec::with_capacity(tenant.local_timelines.len()); + for timeline_id in tenant.local_timelines.keys() { + let (sender, receiver) = std::sync::mpsc::channel::<()>(); tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach( - ZTenantTimelineId::new(tenant_id, timeline_id), + ZTenantTimelineId::new(tenant_id, *timeline_id), + sender, )); + walreceiver_join_handles.push((*timeline_id, receiver)); } - None => bail!("Tenant {tenant_id} not found in local tenant state"), + // drop the tenants lock + walreceiver_join_handles + }; + + // wait for wal receivers to stop without holding the lock, because walreceiver + // will attempt to change tenant state which is protected by the same global tenants lock. + // TODO do we need a timeout here? how to handle it? + // recv_timeout is broken: https://github.com/rust-lang/rust/issues/94518#issuecomment-1057440631 + // need to use crossbeam-channel + for (timeline_id, join_handle) in walreceiver_join_handles { + info!("waiting for wal receiver to shutdown timeline_id {timeline_id}"); + join_handle.recv().context("failed to join walreceiver")?; + info!("wal receiver shutdown confirmed timeline_id {timeline_id}"); } - let local_timeline_directory = conf.timeline_path(&timeline_id, &tenant_id); + tenants_state::write_tenants().remove(&tenant_id); + + // If removal fails there will be no way to successfully retry detach, + // because tenant no longer exists in in memory map. And it needs to be removed from it + // before we remove files because it contains references to repository + // which references ephemeral files which are deleted on drop. So if we keep these references + // code will attempt to remove files which no longer exist. This can be fixed by having shutdown + // mechanism for repository that will clean temporary data to avoid any references to ephemeral files + let local_timeline_directory = conf.tenant_path(&tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { format!( "Failed to remove local timeline directory '{}'", diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index a3939661c1..e0e79e4166 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -202,7 +202,7 @@ pub fn create_repo( // anymore, but I think that could still happen. let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {}); - (wal_redo_manager as _, RemoteIndex::empty()) + (wal_redo_manager as _, RemoteIndex::default()) } }; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 9f0f911e0c..b70350e0da 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -264,7 +264,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( info!("Processing timeline update: {update:?}"); match update { // Timeline got detached, stop all related tasks and remove public timeline data. - LocalTimelineUpdate::Detach(id) => { + LocalTimelineUpdate::Detach(id, join_sender) => { match local_timeline_wal_receivers.get_mut(&id.tenant_id) { Some(wal_receivers) => { if let hash_map::Entry::Occupied(o) = wal_receivers.entry(id.timeline_id) { @@ -280,6 +280,11 @@ async fn wal_receiver_main_thread_loop_step<'a>( }; { WAL_RECEIVER_ENTRIES.write().await.remove(&id); + if let Err(e) = join_sender.send(()) { + warn!("cannot send wal_receiver shutdown confirmation {e}") + } else { + info!("confirm walreceiver shutdown for {id}"); + } } } // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index 20e63b4e5c..3e7ba22184 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -105,16 +105,3 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): branch2_cur.execute('SELECT count(*) FROM foo') assert branch2_cur.fetchone() == (300000, ) - - -def test_ancestor_branch_detach(neon_simple_env: NeonEnv): - env = neon_simple_env - - parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_detach_parent", "empty") - - env.neon_cli.create_branch("test_ancestor_branch_detach_branch1", - "test_ancestor_branch_detach_parent") - - ps_http = env.pageserver.http_client() - with pytest.raises(NeonPageserverApiException, match="Failed to detach inmem tenant timeline"): - ps_http.timeline_detach(env.initial_tenant, parent_timeline_id) diff --git a/test_runner/batch_others/test_detach.py b/test_runner/batch_others/test_detach.py new file mode 100644 index 0000000000..105facb656 --- /dev/null +++ b/test_runner/batch_others/test_detach.py @@ -0,0 +1,49 @@ +from threading import Thread +from uuid import uuid4 +import psycopg2 +import pytest + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder + + +def test_detach_smoke(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + tenant_id, timeline_id = env.neon_cli.create_tenant() + pg = env.postgres.create_start('main', tenant_id=tenant_id) + # we rely upon autocommit after each statement + pg.safe_psql_many(queries=[ + 'CREATE TABLE t(key int primary key, value text)', + 'INSERT INTO t SELECT generate_series(1,100000), \'payload\'', + ]) + + # gc should try to even start + with pytest.raises(expected_exception=psycopg2.DatabaseError, + match='gc target timeline does not exist'): + env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {uuid4().hex} 0') + + gc_thread = Thread( + target=lambda: env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0'), ) + gc_thread.start() + + last_error = None + for i in range(3): + try: + pageserver_http.tenant_detach(tenant_id) + except Exception as e: + last_error = e + log.error(f"try {i} error detaching tenant: {e}") + continue + else: + break + # else is called if the loop finished without reaching "break" + else: + pytest.fail(f"could not detach timeline: {last_error}") + + gc_thread.join(timeout=10) + + with pytest.raises(expected_exception=psycopg2.DatabaseError, + match=f'Tenant {tenant_id.hex} not found'): + env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0') diff --git a/test_runner/batch_others/test_normal_work.py b/test_runner/batch_others/test_normal_work.py index 4635a70de6..5b25691517 100644 --- a/test_runner/batch_others/test_normal_work.py +++ b/test_runner/batch_others/test_normal_work.py @@ -24,7 +24,7 @@ def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient): assert res_2[0] == (5000050000, ) pg.stop() - pageserver_http.timeline_detach(tenant_id, timeline_id) + pageserver_http.tenant_detach(tenant_id) @pytest.mark.parametrize('num_timelines,num_safekeepers', [(3, 1)]) diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index b0ba8758cc..ac39c6290b 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -91,14 +91,14 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto # Introduce failpoint in download env.pageserver.safe_psql(f"failpoints remote-storage-download-pre-rename=return") - client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) + client.tenant_attach(UUID(tenant_id)) - # is there a better way to assert that fafilpoint triggered? + # is there a better way to assert that failpoint triggered? time.sleep(10) # assert cannot attach timeline that is scheduled for download - with pytest.raises(Exception, match="Timeline download is already in progress"): - client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) + with pytest.raises(Exception, match="Conflict: Tenant download is already in progress"): + client.tenant_attach(UUID(tenant_id)) detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) log.info("Timeline detail with active failpoint: %s", detail) @@ -109,7 +109,7 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto env.pageserver.stop() env.pageserver.start() - client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) + client.tenant_attach(UUID(tenant_id)) log.info("waiting for timeline redownload") wait_until(number_of_iterations=10, diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index e9c493cad6..0560469ca1 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -3,14 +3,13 @@ import os import pathlib import subprocess import threading -import typing from uuid import UUID from fixtures.log_helper import log -from typing import Optional +from typing import Any, Dict, Optional, Tuple import signal import pytest -from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir +from fixtures.neon_fixtures import NeonEnv, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir, base_dir from fixtures.utils import lsn_from_hex, subprocess_capture @@ -101,6 +100,102 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve log.info('load thread stopped') +def populate_branch(pg: Postgres, create_table: bool, + expected_sum: Optional[int]) -> Tuple[UUID, int]: + # insert some data + with pg_cur(pg) as cur: + cur.execute("SHOW neon.timeline_id") + timeline_id = UUID(cur.fetchone()[0]) + log.info("timeline to relocate %s", timeline_id.hex) + + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + if create_table: + cur.execute("CREATE TABLE t(key int, value text)") + cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'") + if expected_sum is not None: + cur.execute("SELECT sum(key) FROM t") + assert cur.fetchone() == (expected_sum, ) + cur.execute("SELECT pg_current_wal_flush_lsn()") + + current_lsn = lsn_from_hex(cur.fetchone()[0]) + return timeline_id, current_lsn + + +def ensure_checkpoint( + pageserver_cur, + pageserver_http: NeonPageserverHttpClient, + tenant_id: UUID, + timeline_id: UUID, + current_lsn: int, +): + # run checkpoint manually to be sure that data landed in remote storage + pageserver_cur.execute(f"checkpoint {tenant_id.hex} {timeline_id.hex}") + + # wait until pageserver successfully uploaded a checkpoint to remote storage + wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn) + + +def check_timeline_attached( + new_pageserver_http_client: NeonPageserverHttpClient, + tenant_id: UUID, + timeline_id: UUID, + old_timeline_detail: Dict[str, Any], + old_current_lsn: int, +): + # new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint + new_timeline_detail = wait_until( + number_of_iterations=5, + interval=1, + func=lambda: assert_local(new_pageserver_http_client, tenant_id, timeline_id)) + + # when load is active these checks can break because lsns are not static + # so lets check with some margin + assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']), + lsn_from_hex(old_timeline_detail['local']['disk_consistent_lsn']), + 0.03) + + assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']), + old_current_lsn, + 0.03) + + +def switch_pg_to_new_pageserver(env: NeonEnv, + pg: Postgres, + new_pageserver_port: int, + tenant_id: UUID, + timeline_id: UUID) -> pathlib.Path: + pg.stop() + + pg_config_file_path = pathlib.Path(pg.config_file_path()) + pg_config_file_path.open('a').write( + f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_port}'") + + pg.start() + + timeline_to_detach_local_path = env.repo_dir / 'tenants' / tenant_id.hex / 'timelines' / timeline_id.hex + files_before_detach = os.listdir(timeline_to_detach_local_path) + assert 'metadata' in files_before_detach, f'Regular timeline {timeline_to_detach_local_path} should have the metadata file,\ + but got: {files_before_detach}' + assert len(files_before_detach) >= 2, f'Regular timeline {timeline_to_detach_local_path} should have at least one layer file,\ + but got {files_before_detach}' + + return timeline_to_detach_local_path + + +def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: pathlib.Path): + with pg_cur(pg) as cur: + # check that data is still there + cur.execute("SELECT sum(key) FROM t") + assert cur.fetchone() == (sum_before_migration, ) + # check that we can write new data + cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'") + cur.execute("SELECT sum(key) FROM t") + assert cur.fetchone() == (sum_before_migration + 1500500, ) + + assert not os.path.exists(old_local_path), f'After detach, local timeline dir {old_local_path} should be removed' + + @pytest.mark.parametrize( 'method', [ @@ -126,61 +221,73 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder, # create folder for remote storage mock remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage' - tenant, _ = env.neon_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) - log.info("tenant to relocate %s", tenant) + # we use two branches to check that they are both relocated + # first branch is used for load, compute for second one is used to + # check that data is not lost - # attach does not download ancestor branches (should it?), just use root branch for now - env.neon_cli.create_root_branch('test_tenant_relocation', tenant_id=tenant) + tenant_id, initial_timeline_id = env.neon_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) + log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id) - tenant_pg = env.postgres.create_start(branch_name='test_tenant_relocation', - node_name='test_tenant_relocation', - tenant_id=tenant) + env.neon_cli.create_branch("test_tenant_relocation_main", tenant_id=tenant_id) + pg_main = env.postgres.create_start(branch_name='test_tenant_relocation_main', + tenant_id=tenant_id) - # insert some data - with closing(tenant_pg.connect()) as conn: - with conn.cursor() as cur: - # save timeline for later gc call - cur.execute("SHOW neon.timeline_id") - timeline = UUID(cur.fetchone()[0]) - log.info("timeline to relocate %s", timeline.hex) + timeline_id_main, current_lsn_main = populate_branch(pg_main, create_table=True, expected_sum=500500) - # we rely upon autocommit after each statement - # as waiting for acceptors happens there - cur.execute("CREATE TABLE t(key int primary key, value text)") - cur.execute("INSERT INTO t SELECT generate_series(1,1000), 'some payload'") - cur.execute("SELECT sum(key) FROM t") - assert cur.fetchone() == (500500, ) - cur.execute("SELECT pg_current_wal_flush_lsn()") + env.neon_cli.create_branch( + new_branch_name="test_tenant_relocation_second", + ancestor_branch_name="test_tenant_relocation_main", + tenant_id=tenant_id, + ) + pg_second = env.postgres.create_start(branch_name='test_tenant_relocation_second', + tenant_id=tenant_id) - current_lsn = lsn_from_hex(cur.fetchone()[0]) + # do not select sum for second branch, this select will wait until wal reaches pageserver + # try to check another case when pageserver didnt receive that wal and needs to get it from safekeeper + timeline_id_second, current_lsn_second = populate_branch(pg_second, create_table=False, expected_sum=1001000) pageserver_http = env.pageserver.http_client() # wait until pageserver receives that data - wait_for_last_record_lsn(pageserver_http, tenant, timeline, current_lsn) - timeline_detail = assert_local(pageserver_http, tenant, timeline) + wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_main, current_lsn_main) + timeline_detail_main = assert_local(pageserver_http, tenant_id, timeline_id_main) + + wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_second, current_lsn_second) + timeline_detail_second = assert_local(pageserver_http, tenant_id, timeline_id_second) if with_load == 'with_load': # create load table - with pg_cur(tenant_pg) as cur: + with pg_cur(pg_main) as cur: cur.execute("CREATE TABLE load(value text)") load_stop_event = threading.Event() load_ok_event = threading.Event() load_thread = threading.Thread( target=load, - args=(tenant_pg, load_stop_event, load_ok_event), + args=(pg_main, load_stop_event, load_ok_event), daemon=True, # To make sure the child dies when the parent errors ) load_thread.start() - # run checkpoint manually to be sure that data landed in remote storage - with closing(env.pageserver.connect()) as psconn: - with psconn.cursor() as pscur: - pscur.execute(f"checkpoint {tenant.hex} {timeline.hex}") + # this requirement introduces a problem + # if user creates a branch during migration + # it wont appear on the new pageserver + with pg_cur(env.pageserver) as cur: + ensure_checkpoint( + cur, + pageserver_http=pageserver_http, + tenant_id=tenant_id, + timeline_id=timeline_id_main, + current_lsn=current_lsn_main, + ) - # wait until pageserver successfully uploaded a checkpoint to remote storage - wait_for_upload(pageserver_http, tenant, timeline, current_lsn) + ensure_checkpoint( + cur, + pageserver_http=pageserver_http, + tenant_id=tenant_id, + timeline_id=timeline_id_second, + current_lsn=current_lsn_second, + ) log.info("inititalizing new pageserver") # bootstrap second pageserver @@ -207,7 +314,7 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder, "python", os.path.join(base_dir, "scripts/export_import_between_pageservers.py"), "--tenant-id", - tenant.hex, + tenant_id.hex, "--from-host", "localhost", "--from-http-port", @@ -228,22 +335,23 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder, subprocess_capture(str(env.repo_dir), cmd, check=True) elif method == "minor": # call to attach timeline to new pageserver - new_pageserver_http.timeline_attach(tenant, timeline) + new_pageserver_http.tenant_attach(tenant_id) - # new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint - new_timeline_detail = wait_until( - number_of_iterations=5, - interval=1, - func=lambda: assert_local(new_pageserver_http, tenant, timeline)) + check_timeline_attached( + new_pageserver_http, + tenant_id, + timeline_id_main, + timeline_detail_main, + current_lsn_main, + ) - # when load is active these checks can break because lsns are not static - # so lets check with some margin - assert_abs_margin_ratio( - lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']), - lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']), - 0.03) - - tenant_pg.stop() + check_timeline_attached( + new_pageserver_http, + tenant_id, + timeline_id_second, + timeline_detail_second, + current_lsn_second, + ) # rewrite neon cli config to use new pageserver for basebackup to start new compute cli_config_lines = (env.repo_dir / 'config').read_text().splitlines() @@ -251,33 +359,29 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder, cli_config_lines[-1] = f"listen_pg_addr = 'localhost:{new_pageserver_pg_port}'" (env.repo_dir / 'config').write_text('\n'.join(cli_config_lines)) - tenant_pg_config_file_path = pathlib.Path(tenant_pg.config_file_path()) - tenant_pg_config_file_path.open('a').write( - f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_pg_port}'" + old_local_path_main = switch_pg_to_new_pageserver( + env, + pg_main, + new_pageserver_pg_port, + tenant_id, + timeline_id_main, ) - tenant_pg.start() - - timeline_to_detach_local_path = env.repo_dir / 'tenants' / tenant.hex / 'timelines' / timeline.hex - files_before_detach = os.listdir(timeline_to_detach_local_path) - assert 'metadata' in files_before_detach, f'Regular timeline {timeline_to_detach_local_path} should have the metadata file,\ - but got: {files_before_detach}' - assert len(files_before_detach) > 2, f'Regular timeline {timeline_to_detach_local_path} should have at least one layer file,\ - but got {files_before_detach}' + old_local_path_second = switch_pg_to_new_pageserver( + env, + pg_second, + new_pageserver_pg_port, + tenant_id, + timeline_id_second, + ) # detach tenant from old pageserver before we check # that all the data is there to be sure that old pageserver # is no longer involved, and if it is, we will see the errors - pageserver_http.timeline_detach(tenant, timeline) + pageserver_http.tenant_detach(tenant_id) - with pg_cur(tenant_pg) as cur: - # check that data is still there - cur.execute("SELECT sum(key) FROM t") - assert cur.fetchone() == (500500, ) - # check that we can write new data - cur.execute("INSERT INTO t SELECT generate_series(1001,2000), 'some payload'") - cur.execute("SELECT sum(key) FROM t") - assert cur.fetchone() == (2001000, ) + post_migration_check(pg_main, 500500, old_local_path_main) + post_migration_check(pg_second, 1001000, old_local_path_second) if with_load == 'with_load': assert load_ok_event.wait(3) @@ -286,8 +390,6 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder, load_thread.join(timeout=10) log.info('load thread stopped') - assert not os.path.exists(timeline_to_detach_local_path), f'After detach, local timeline dir {timeline_to_detach_local_path} should be removed' - # bring old pageserver back for clean shutdown via neon cli # new pageserver will be shut down by the context manager cli_config_lines = (env.repo_dir / 'config').read_text().splitlines() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 3d4daf5f29..e6967e3682 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -795,16 +795,12 @@ class NeonPageserverHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() - def timeline_attach(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID): - res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/attach", - ) + def tenant_attach(self, tenant_id: uuid.UUID): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/attach") self.verbose_error(res) - def timeline_detach(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID): - res = self.post( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/detach", - ) + def tenant_detach(self, tenant_id: uuid.UUID): + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/detach") self.verbose_error(res) def timeline_create(