diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 3b413e30ce..ff52f033d1 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -122,8 +122,9 @@ impl S3ObjectKey { impl RemoteObjectName for S3ObjectKey { /// Turn a/b/c or a/b/c/ into c fn object_name(&self) -> Option<&str> { - // corner case - if &self.0 == "/" { + // corner case, char::to_string is not const, thats why this is more verbose than it needs to be + // see https://github.com/rust-lang/rust/issues/88674 + if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR { return None; } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index ebbb0d5ced..6cfedc0931 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -122,6 +122,35 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + delete: + description: "Attempts to delete specified timeline. On 500 errors should be retried" + responses: + "200": + description: Ok + "400": + description: Error when no tenant id found in path or no timeline id + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" /v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver: parameters: @@ -190,7 +219,6 @@ paths: "410": description: GONE - /v1/tenant/{tenant_id}/attach: parameters: - name: tenant_id @@ -200,7 +228,7 @@ paths: type: string format: hex post: - description: Deprecated + description: Schedules attach operation to happen in the background for given tenant responses: "202": description: Tenant attaching scheduled @@ -299,7 +327,6 @@ paths: schema: $ref: "#/components/schemas/Error" - /v1/tenant/{tenant_id}/timeline/: parameters: - name: tenant_id diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 41c78210f4..997e3c9a1f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -353,22 +353,45 @@ async fn try_download_tenant_index( Ok(Some(remote_timelines)) } -async fn timeline_detach_handler(_: Request) -> Result, ApiError> { - json_response(StatusCode::GONE, ()) +async fn timeline_delete_handler(request: Request) -> Result, ApiError> { + let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + + let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; + + let state = get_state(&request); + tokio::task::spawn_blocking(move || { + let _enter = info_span!("tenant_detach_handler", tenant = %tenant_id).entered(); + tenant_mgr::delete_timeline(tenant_id, timeline_id) + }) + .await + .map_err(ApiError::from_err)??; + + let mut remote_index = state.remote_index.write().await; + remote_index.remove_timeline_entry(ZTenantTimelineId { + tenant_id, + timeline_id, + }); + + json_response(StatusCode::OK, ()) } async fn tenant_detach_handler(request: Request) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; + let state = get_state(&request); + let conf = state.conf; tokio::task::spawn_blocking(move || { let _enter = info_span!("tenant_detach_handler", tenant = %tenant_id).entered(); - let state = get_state(&request); - tenant_mgr::detach_tenant(state.conf, tenant_id) + tenant_mgr::detach_tenant(conf, tenant_id) }) .await .map_err(ApiError::from_err)??; + let mut remote_index = state.remote_index.write().await; + remote_index.remove_tenant_entry(&tenant_id); + json_response(StatusCode::OK, ()) } @@ -540,6 +563,10 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id", timeline_detail_handler, ) + .delete( + "/v1/tenant/:tenant_id/timeline/:timeline_id", + timeline_delete_handler, + ) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver", wal_receiver_get_handler, @@ -550,7 +577,7 @@ pub fn make_router( ) .post( "/v1/tenant/:tenant_id/timeline/:timeline_id/detach", - timeline_detach_handler, + timeline_delete_handler, ) .any(handler_404)) } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 2369f46c4f..a1870703f4 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -38,9 +38,7 @@ use crate::keyspace::KeySpace; use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; -use crate::repository::{ - GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter, -}; +use crate::repository::{GcResult, Repository, RepositoryTimeline, Timeline, TimelineWriter}; use crate::repository::{Key, Value}; use crate::tenant_mgr; use crate::thread_mgr; @@ -410,28 +408,61 @@ impl Repository for LayeredRepository { Ok(()) } - fn apply_timeline_remote_sync_status_update( - &self, - timeline_id: ZTimelineId, - timeline_sync_status_update: TimelineSyncStatusUpdate, - ) -> Result<()> { - debug!( - "apply_timeline_remote_sync_status_update timeline_id: {} update: {:?}", - timeline_id, timeline_sync_status_update + // in order to be retriable detach needs to be idempotent + fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> { + // in order to be retriable detach needs to be idempotent + let mut timelines = self.timelines.lock().unwrap(); + + // Ensure that there are no child timelines **attached to that pageserver**, + // because detach removes files, which will brake child branches + let num_children = timelines + .iter() + .filter(|(_, entry)| entry.ancestor_timeline_id() == Some(timeline_id)) + .count(); + + ensure!( + num_children == 0, + "Cannot detach timeline which has child timelines" ); - match timeline_sync_status_update { - TimelineSyncStatusUpdate::Downloaded => { - match self.timelines.lock().unwrap().entry(timeline_id) { - Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."), - Entry::Vacant(entry) => { - // we need to get metadata of a timeline, another option is to pass it along with Downloaded status - let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; - // finally we make newly downloaded timeline visible to repository - entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, }) - }, - }; - } - } + let timeline_entry = match timelines.entry(timeline_id) { + Entry::Occupied(e) => e, + Entry::Vacant(_) => bail!("timeline not found"), + }; + + // try to acquire gc and compaction locks to prevent errors from missing files + let _gc_guard = self + .gc_cs + .try_lock() + .map_err(|e| anyhow::anyhow!("cannot acquire gc lock {e}"))?; + + let compaction_guard = timeline_entry.get().compaction_guard()?; + + let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); + std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { + format!( + "Failed to remove local timeline directory '{}'", + local_timeline_directory.display() + ) + })?; + info!("detach removed files"); + + drop(compaction_guard); + timeline_entry.remove(); + + Ok(()) + } + + fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> { + debug!("attach timeline_id: {}", timeline_id,); + match self.timelines.lock().unwrap().entry(timeline_id) { + Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."), + Entry::Vacant(entry) => { + // we need to get metadata of a timeline, another option is to pass it along with Downloaded status + let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; + // finally we make newly downloaded timeline visible to repository + entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, }) + }, + }; Ok(()) } @@ -481,6 +512,18 @@ impl LayeredTimelineEntry { } } } + + fn compaction_guard(&self) -> Result>, anyhow::Error> { + match self { + LayeredTimelineEntry::Loaded(timeline) => timeline + .compaction_cs + .try_lock() + .map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}")) + .map(Some), + + LayeredTimelineEntry::Unloaded { .. } => Ok(None), + } + } } impl From for RepositoryTimeline { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index f9ea4a6ff8..5b28681b16 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -7,7 +7,6 @@ use byteorder::{ByteOrder, BE}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::fmt; -use std::fmt::Display; use std::ops::{AddAssign, Range}; use std::sync::{Arc, RwLockReadGuard}; use std::time::Duration; @@ -182,20 +181,6 @@ impl Value { } } -#[derive(Clone, Copy, Debug)] -pub enum TimelineSyncStatusUpdate { - Downloaded, -} - -impl Display for TimelineSyncStatusUpdate { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let s = match self { - TimelineSyncStatusUpdate::Downloaded => "Downloaded", - }; - f.write_str(s) - } -} - /// /// A repository corresponds to one .neon directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. @@ -204,11 +189,7 @@ pub trait Repository: Send + Sync { /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. /// See [`crate::remote_storage`] for more details about the synchronization. - fn apply_timeline_remote_sync_status_update( - &self, - timeline_id: ZTimelineId, - timeline_sync_status_update: TimelineSyncStatusUpdate, - ) -> Result<()>; + fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; /// Get Timeline handle for given zenith timeline ID. /// This function is idempotent. It doesn't change internal state in any way. @@ -260,7 +241,10 @@ pub trait Repository: Send + Sync { /// api's 'compact' command. fn compaction_iteration(&self) -> Result<()>; - // Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. + /// removes timeline-related in-memory data + fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()>; + + /// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. fn get_remote_index(&self) -> &RemoteIndex; } @@ -550,10 +534,7 @@ pub mod repo_harness { .parse() .unwrap(); - repo.apply_timeline_remote_sync_status_update( - timeline_id, - TimelineSyncStatusUpdate::Downloaded, - )?; + repo.attach_timeline(timeline_id)?; } Ok(repo) diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index c52da95945..6df41d854c 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -178,9 +178,8 @@ use crate::{ metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}, LayeredRepository, }, - repository::TimelineSyncStatusUpdate, storage_sync::{self, index::RemoteIndex}, - tenant_mgr::apply_timeline_sync_status_updates, + tenant_mgr::attach_downloaded_tenants, thread_mgr, thread_mgr::ThreadKind, }; @@ -191,9 +190,8 @@ use metrics::{ }; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; -pub use self::download::download_index_part; +use self::download::download_index_parts; pub use self::download::download_tenant_index_parts; -pub use self::download::try_download_index_parts; pub use self::download::TEMP_DOWNLOAD_EXTENSION; lazy_static! { @@ -837,7 +835,7 @@ where .build() .context("Failed to create storage sync runtime")?; - let applicable_index_parts = runtime.block_on(try_download_index_parts( + let applicable_index_parts = runtime.block_on(download_index_parts( conf, &storage, local_timeline_files.keys().copied().collect(), @@ -928,6 +926,8 @@ fn storage_sync_loop( "Sync loop step completed, {} new tenant state update(s)", updated_tenants.len() ); + let mut sync_status_updates: HashMap> = + HashMap::new(); let index_accessor = runtime.block_on(index.write()); for tenant_id in updated_tenants { let tenant_entry = match index_accessor.tenant_entry(&tenant_id) { @@ -945,7 +945,7 @@ fn storage_sync_loop( continue; } else { info!( - "Tenant {tenant_id} download completed. Registering in repository" + "Tenant {tenant_id} download completed. Picking to register in repository" ); // Here we assume that if tenant has no in-progress downloads that // means that it is the last completed timeline download that triggered @@ -953,26 +953,13 @@ fn storage_sync_loop( // and register them all at once in a repository for download // to be submitted in a single operation to repository // so it can apply them at once to internal timeline map. - let sync_status_updates: HashMap< - ZTimelineId, - TimelineSyncStatusUpdate, - > = tenant_entry - .keys() - .copied() - .map(|timeline_id| { - (timeline_id, TimelineSyncStatusUpdate::Downloaded) - }) - .collect(); - - // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - apply_timeline_sync_status_updates( - conf, - &index, - tenant_id, - sync_status_updates, - ); + sync_status_updates + .insert(tenant_id, tenant_entry.keys().copied().collect()); } } + drop(index_accessor); + // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. + attach_downloaded_tenants(conf, &index, sync_status_updates); } } ControlFlow::Break(()) => { @@ -983,6 +970,14 @@ fn storage_sync_loop( } } +// needed to check whether the download happened +// more informative than just a bool +#[derive(Debug)] +enum DownloadMarker { + Downloaded, + Nothing, +} + async fn process_batches( conf: &'static PageServerConf, max_sync_errors: NonZeroU32, @@ -1015,17 +1010,19 @@ where }) .collect::>(); - let mut new_timeline_states = HashSet::new(); + let mut downloaded_timelines = HashSet::new(); - // we purposely ignore actual state update, because we're waiting for last timeline download to happen - while let Some((sync_id, state_update)) = sync_results.next().await { - debug!("Finished storage sync task for sync id {sync_id}"); - if state_update.is_some() { - new_timeline_states.insert(sync_id.tenant_id); + while let Some((sync_id, download_marker)) = sync_results.next().await { + debug!( + "Finished storage sync task for sync id {sync_id} download marker {:?}", + download_marker + ); + if matches!(download_marker, DownloadMarker::Downloaded) { + downloaded_timelines.insert(sync_id.tenant_id); } } - new_timeline_states + downloaded_timelines } async fn process_sync_task_batch( @@ -1034,7 +1031,7 @@ async fn process_sync_task_batch( max_sync_errors: NonZeroU32, sync_id: ZTenantTimelineId, batch: SyncTaskBatch, -) -> Option +) -> DownloadMarker where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, @@ -1119,7 +1116,7 @@ where } } } - None + DownloadMarker::Nothing } .instrument(info_span!("download_timeline_data")), ); @@ -1173,7 +1170,7 @@ async fn download_timeline_data( new_download_data: SyncData, sync_start: Instant, task_name: &str, -) -> Option +) -> DownloadMarker where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, @@ -1202,7 +1199,7 @@ where Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) { Ok(()) => { register_sync_status(sync_id, sync_start, task_name, Some(true)); - return Some(TimelineSyncStatusUpdate::Downloaded); + return DownloadMarker::Downloaded; } Err(e) => { error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}"); @@ -1218,7 +1215,7 @@ where } } - None + DownloadMarker::Nothing } async fn update_local_metadata( diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index 8cb9906e33..05a3df166a 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -39,7 +39,7 @@ pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; /// So there are two requirements: keep everything in one futures unordered /// to allow higher concurrency. Mark tenants as failed independently. /// That requires some bookeeping. -pub async fn try_download_index_parts( +pub async fn download_index_parts( conf: &'static PageServerConf, storage: &S, keys: HashSet, @@ -116,7 +116,7 @@ where }); } - let index_parts = try_download_index_parts(conf, storage, sync_ids) + let index_parts = download_index_parts(conf, storage, sync_ids) .await .remove(&tenant_id) .ok_or(anyhow::anyhow!( @@ -127,7 +127,7 @@ where } /// Retrieves index data from the remote storage for a given timeline. -pub async fn download_index_part( +async fn download_index_part( conf: &'static PageServerConf, storage: &S, sync_id: ZTenantTimelineId, diff --git a/pageserver/src/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs index 8bc9f6f189..54be3d0f8c 100644 --- a/pageserver/src/storage_sync/index.rs +++ b/pageserver/src/storage_sync/index.rs @@ -159,6 +159,19 @@ impl RemoteTimelineIndex { .insert(timeline_id, entry); } + pub fn remove_timeline_entry( + &mut self, + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, + ) -> Option { + self.entries + .entry(tenant_id) + .or_default() + .remove(&timeline_id) + } + pub fn tenant_entry(&self, tenant_id: &ZTenantId) -> Option<&TenantEntry> { self.entries.get(tenant_id) } @@ -171,6 +184,10 @@ impl RemoteTimelineIndex { self.entries.entry(tenant_id).or_default() } + pub fn remove_tenant_entry(&mut self, tenant_id: &ZTenantId) -> Option { + self.entries.remove(tenant_id) + } + pub fn set_awaits_download( &mut self, id: &ZTenantTimelineId, diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c96dc6973b..84282be63f 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -4,7 +4,7 @@ use crate::config::PageServerConf; use crate::layered_repository::{load_metadata, LayeredRepository}; use crate::pgdatadir_mapping::DatadirTimeline; -use crate::repository::{Repository, TimelineSyncStatusUpdate}; +use crate::repository::Repository; use crate::storage_sync::index::RemoteIndex; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; @@ -17,7 +17,7 @@ use anyhow::{bail, Context}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::sync::Arc; use tokio::sync::mpsc; @@ -157,7 +157,13 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result anyhow::Result), - Attach(ZTenantTimelineId, Arc), + Detach { + id: ZTenantTimelineId, + // used to signal to the detach caller that walreceiver successfully terminated for specified id + join_confirmation_sender: std::sync::mpsc::Sender<()>, + }, + Attach { + id: ZTenantTimelineId, + datadir: Arc, + }, } impl std::fmt::Debug for LocalTimelineUpdate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Detach(ttid, _) => f.debug_tuple("Remove").field(ttid).finish(), - Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(), + Self::Detach { id, .. } => f.debug_tuple("Remove").field(id).finish(), + Self::Attach { id, .. } => f.debug_tuple("Add").field(id).finish(), } } } /// Updates tenants' repositories, changing their timelines state in memory. -pub fn apply_timeline_sync_status_updates( +pub fn attach_downloaded_tenants( conf: &'static PageServerConf, remote_index: &RemoteIndex, - tenant_id: ZTenantId, - sync_status_updates: HashMap, + sync_status_updates: HashMap>, ) { if sync_status_updates.is_empty() { - debug!("no sync status updates to apply"); + debug!("No sync status updates to apply"); return; } - info!( - "Applying sync status updates for tenant {tenant_id} {} timelines", - sync_status_updates.len() - ); - debug!("Sync status updates: {sync_status_updates:?}"); + for (tenant_id, downloaded_timelines) in sync_status_updates { + info!( + "Registering downlloaded timelines for {tenant_id} {} timelines", + downloaded_timelines.len() + ); + debug!("Downloaded timelines: {downloaded_timelines:?}"); - let repo = match load_local_repo(conf, tenant_id, remote_index) { - Ok(repo) => repo, - Err(e) => { - error!("Failed to load repo for tenant {tenant_id} Error: {e:?}"); - return; + let repo = match load_local_repo(conf, tenant_id, remote_index) { + Ok(repo) => repo, + Err(e) => { + error!("Failed to load repo for tenant {tenant_id} Error: {e:?}"); + continue; + } + }; + match attach_downloaded_tenant(&repo, downloaded_timelines) { + Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"), + Err(e) => error!( + "Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}" + ), } - }; - match apply_timeline_remote_sync_status_updates(&repo, sync_status_updates) { - Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"), - Err(e) => error!( - "Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}" - ), } } @@ -386,6 +400,59 @@ pub fn get_local_timeline_with_load( } } +pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> { + // shutdown the timeline threads (this shuts down the walreceiver) + // FIXME it does not shut down wal receiver + + // Things needed to be done + // *. check no ancestors + // *. remove from repo map + // *. remove from global tenant timelines map + // -- no new connections can see the timeline + // *. shutdown threads + // *. join walreceiver (any flushing thread?) + // *. delete files while ensuring that no gc or compaction is in progress + // 7. should we checkpoint before detach? That can be harmful during relocation, + // because it will upload to s3 something that other pageserver didnt see + // TODO put falpoints at every step. Iterate over failpoints + // in detach test and check that timeline is either attached or detached + // verify with a try to start a compute + // TODO adjust remote_index + // what is harder, write whole tenant detach correctly, or fix the timeline based one. + + // TODO bail on active page_service threads? + // TODO what about inprogress downloads or uploads? + // can it be idempotent? + // FAILPOINTS: broken repo.detach_timeline + // broken wal_receiver + // broken rmdir + + let (sender, receiver) = std::sync::mpsc::channel::<()>(); + tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach { + id: ZTenantTimelineId::new(tenant_id, timeline_id), + join_confirmation_sender: sender, + }); + + info!("waiting for wal receiver to shutdown"); + let _ = receiver.recv(); + info!("wal receiver shutdown confirmed"); + info!("waiting for threads to shutdown"); + thread_mgr::shutdown_threads(None, None, Some(timeline_id)); + info!("thread shutdown completed"); + match tenants_state::write_tenants().get_mut(&tenant_id) { + Some(tenant) => { + tenant + .repo + .delete_timeline(timeline_id) + .context("Failed to delete tenant timeline from repo")?; + tenant.local_timelines.remove(&timeline_id); + } + None => warn!("Tenant {tenant_id} not found in local tenant state"), + } + + Ok(()) +} + pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> anyhow::Result<()> { set_tenant_state(tenant_id, TenantState::Stopping)?; // shutdown the tenant and timeline threads: gc, compaction, page service threads) @@ -399,10 +466,10 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any let mut walreceiver_join_handles = Vec::with_capacity(tenant.local_timelines.len()); for timeline_id in tenant.local_timelines.keys() { let (sender, receiver) = std::sync::mpsc::channel::<()>(); - tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach( - ZTenantTimelineId::new(tenant_id, *timeline_id), - sender, - )); + tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach { + id: ZTenantTimelineId::new(tenant_id, *timeline_id), + join_confirmation_sender: sender, + }); walreceiver_join_handles.push((*timeline_id, receiver)); } // drop the tenants lock @@ -428,11 +495,11 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any // which references ephemeral files which are deleted on drop. So if we keep these references // code will attempt to remove files which no longer exist. This can be fixed by having shutdown // mechanism for repository that will clean temporary data to avoid any references to ephemeral files - let local_timeline_directory = conf.tenant_path(&tenant_id); - std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { + let local_tenant_directory = conf.tenant_path(&tenant_id); + std::fs::remove_dir_all(&local_tenant_directory).with_context(|| { format!( "Failed to remove local timeline directory '{}'", - local_timeline_directory.display() + local_tenant_directory.display() ) })?; @@ -453,10 +520,10 @@ fn load_local_timeline( )); page_tline.init_logical_size()?; - tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach( - ZTenantTimelineId::new(repo.tenant_id(), timeline_id), - Arc::clone(&page_tline), - )); + tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach { + id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id), + datadir: Arc::clone(&page_tline), + }); Ok(page_tline) } @@ -486,9 +553,13 @@ pub fn list_tenants() -> Vec { /// A timeline is categorized as broken when any of following conditions is true: /// - failed to load the timeline's metadata /// - the timeline's disk consistent LSN is zero -fn check_broken_timeline(repo: &LayeredRepository, timeline_id: ZTimelineId) -> anyhow::Result<()> { - let metadata = load_metadata(repo.conf, timeline_id, repo.tenant_id()) - .context("failed to load metadata")?; +fn check_broken_timeline( + conf: &'static PageServerConf, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, +) -> anyhow::Result<()> { + let metadata = + load_metadata(conf, timeline_id, tenant_id).context("failed to load metadata")?; // A timeline with zero disk consistent LSN can happen when the page server // failed to checkpoint the timeline import data when creating that timeline. @@ -499,61 +570,56 @@ fn check_broken_timeline(repo: &LayeredRepository, timeline_id: ZTimelineId) -> Ok(()) } +/// Note: all timelines are attached at once if and only if all of them are locally complete fn init_local_repository( conf: &'static PageServerConf, tenant_id: ZTenantId, local_timeline_init_statuses: HashMap, remote_index: &RemoteIndex, ) -> anyhow::Result<(), anyhow::Error> { - // initialize local tenant - let repo = load_local_repo(conf, tenant_id, remote_index) - .with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?; - - let mut status_updates = HashMap::with_capacity(local_timeline_init_statuses.len()); + let mut timelines_to_attach = HashSet::new(); for (timeline_id, init_status) in local_timeline_init_statuses { match init_status { LocalTimelineInitStatus::LocallyComplete => { debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository"); - if let Err(err) = check_broken_timeline(&repo, timeline_id) { - info!( - "Found a broken timeline {timeline_id} (err={err:?}), skip registering it in repository" - ); - } else { - status_updates.insert(timeline_id, TimelineSyncStatusUpdate::Downloaded); - } + check_broken_timeline(conf, tenant_id, timeline_id) + .context("found broken timeline")?; + timelines_to_attach.insert(timeline_id); } LocalTimelineInitStatus::NeedsSync => { debug!( "timeline {tenant_id} for tenant {timeline_id} needs sync, \ so skipped for adding into repository until sync is finished" ); + return Ok(()); } } } + // initialize local tenant + let repo = load_local_repo(conf, tenant_id, remote_index) + .with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?; + // Lets fail here loudly to be on the safe side. // XXX: It may be a better api to actually distinguish between repository startup // and processing of newly downloaded timelines. - apply_timeline_remote_sync_status_updates(&repo, status_updates) + attach_downloaded_tenant(&repo, timelines_to_attach) .with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?; Ok(()) } -fn apply_timeline_remote_sync_status_updates( +fn attach_downloaded_tenant( repo: &LayeredRepository, - status_updates: HashMap, + downloaded_timelines: HashSet, ) -> anyhow::Result<()> { - let mut registration_queue = Vec::with_capacity(status_updates.len()); + let mut registration_queue = Vec::with_capacity(downloaded_timelines.len()); // first need to register the in-mem representations, to avoid missing ancestors during the local disk data registration - for (timeline_id, status_update) in status_updates { - repo.apply_timeline_remote_sync_status_update(timeline_id, status_update) - .with_context(|| { - format!("Failed to load timeline {timeline_id} into in-memory repository") - })?; - match status_update { - TimelineSyncStatusUpdate::Downloaded => registration_queue.push(timeline_id), - } + for timeline_id in downloaded_timelines { + repo.attach_timeline(timeline_id).with_context(|| { + format!("Failed to load timeline {timeline_id} into in-memory repository") + })?; + registration_queue.push(timeline_id); } for timeline_id in registration_queue { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index b70350e0da..c36343db17 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -264,7 +264,10 @@ async fn wal_receiver_main_thread_loop_step<'a>( info!("Processing timeline update: {update:?}"); match update { // Timeline got detached, stop all related tasks and remove public timeline data. - LocalTimelineUpdate::Detach(id, join_sender) => { + LocalTimelineUpdate::Detach { + id, + join_confirmation_sender, + } => { match local_timeline_wal_receivers.get_mut(&id.tenant_id) { Some(wal_receivers) => { if let hash_map::Entry::Occupied(o) = wal_receivers.entry(id.timeline_id) { @@ -280,7 +283,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( }; { WAL_RECEIVER_ENTRIES.write().await.remove(&id); - if let Err(e) = join_sender.send(()) { + if let Err(e) = join_confirmation_sender.send(()) { warn!("cannot send wal_receiver shutdown confirmation {e}") } else { info!("confirm walreceiver shutdown for {id}"); @@ -288,41 +291,40 @@ async fn wal_receiver_main_thread_loop_step<'a>( } } // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. - LocalTimelineUpdate::Attach(new_id, new_timeline) => { + LocalTimelineUpdate::Attach { id, datadir } => { let timeline_connection_managers = local_timeline_wal_receivers - .entry(new_id.tenant_id) + .entry(id.tenant_id) .or_default(); if timeline_connection_managers.is_empty() { - if let Err(e) = - change_tenant_state(new_id.tenant_id, TenantState::Active).await + if let Err(e) = change_tenant_state(id.tenant_id, TenantState::Active).await { - error!("Failed to make tenant active for id {new_id}: {e:#}"); + error!("Failed to make tenant active for id {id}: {e:#}"); return; } } let vacant_connection_manager_entry = - match timeline_connection_managers.entry(new_id.timeline_id) { + match timeline_connection_managers.entry(id.timeline_id) { hash_map::Entry::Occupied(_) => { - debug!("Attepted to readd an existing timeline {new_id}, ignoring"); + debug!("Attepted to readd an existing timeline {id}, ignoring"); return; } hash_map::Entry::Vacant(v) => v, }; let (wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag) = - match fetch_tenant_settings(new_id.tenant_id).await { + match fetch_tenant_settings(id.tenant_id).await { Ok(settings) => settings, Err(e) => { - error!("Failed to fetch tenant settings for id {new_id}: {e:#}"); + error!("Failed to fetch tenant settings for id {id}: {e:#}"); return; } }; { WAL_RECEIVER_ENTRIES.write().await.insert( - new_id, + id, WalReceiverEntry { wal_producer_connstr: None, last_received_msg_lsn: None, @@ -333,10 +335,10 @@ async fn wal_receiver_main_thread_loop_step<'a>( vacant_connection_manager_entry.insert( connection_manager::spawn_connection_manager_task( - new_id, + id, broker_prefix.to_owned(), etcd_client.clone(), - new_timeline, + datadir, wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag, diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index 3e7ba22184..96132c14f9 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -105,3 +105,26 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder): branch2_cur.execute('SELECT count(*) FROM foo') assert branch2_cur.fetchone() == (300000, ) + + +def test_ancestor_branch_delete(neon_simple_env: NeonEnv): + env = neon_simple_env + + parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_delete_parent", "empty") + + leaf_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_delete_branch1", + "test_ancestor_branch_delete_parent") + + ps_http = env.pageserver.http_client() + with pytest.raises(NeonPageserverApiException, + match="Failed to delete tenant timeline from repo"): + ps_http.timeline_delete(env.initial_tenant, parent_timeline_id) + + ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) + # check 404 + with pytest.raises(NeonPageserverApiException, + match="is not found neither locally nor remotely"): + ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) + + # FIXME leaves tenant without timelines, should we prevent deletion of root timeline? + ps_http.timeline_delete(env.initial_tenant, parent_timeline_id) diff --git a/test_runner/batch_others/test_broken_timeline.py b/test_runner/batch_others/test_broken_timeline.py index b72f337e06..675236fbd7 100644 --- a/test_runner/batch_others/test_broken_timeline.py +++ b/test_runner/batch_others/test_broken_timeline.py @@ -110,6 +110,6 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv): env.neon_cli.pageserver_stop(immediate=True) env.neon_cli.pageserver_start() - # Check that the "broken" timeline is not loaded - timelines = env.neon_cli.list_timelines(tenant_id) - assert len(timelines) == 1 + # Check that tenant with "broken" timeline is not loaded. + with pytest.raises(Exception, match=f"Failed to get repo for tenant {tenant_id.hex}"): + env.neon_cli.list_timelines(tenant_id) diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py index 63dc42ee3e..617d4808cc 100644 --- a/test_runner/batch_others/test_import.py +++ b/test_runner/batch_others/test_import.py @@ -90,7 +90,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Clean up # TODO it should clean itself client = env.pageserver.http_client() - client.timeline_detach(tenant, timeline) + client.timeline_delete(tenant, timeline) # Importing correct backup works import_tar(base_tar, wal_tar) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e6967e3682..aaccb00399 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -795,6 +795,27 @@ class NeonPageserverHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + def tenant_list(self) -> List[Dict[Any, Any]]: + res = self.get(f"http://localhost:{self.port}/v1/tenant") + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, list) + return res_json + + def tenant_create(self, new_tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: + res = self.post( + f"http://localhost:{self.port}/v1/tenant", + json={ + 'new_tenant_id': new_tenant_id.hex if new_tenant_id else None, + }, + ) + self.verbose_error(res) + if res.status_code == 409: + raise Exception(f'could not create tenant: already exists for id {new_tenant_id}') + new_tenant_id = res.json() + assert isinstance(new_tenant_id, str) + return uuid.UUID(new_tenant_id) + def tenant_attach(self, tenant_id: uuid.UUID): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/attach") self.verbose_error(res) @@ -803,6 +824,13 @@ class NeonPageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/detach") self.verbose_error(res) + def timeline_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]: + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline") + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, list) + return res_json + def timeline_create( self, tenant_id: uuid.UUID, @@ -827,34 +855,6 @@ class NeonPageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def tenant_list(self) -> List[Dict[Any, Any]]: - res = self.get(f"http://localhost:{self.port}/v1/tenant") - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, list) - return res_json - - def tenant_create(self, new_tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID: - res = self.post( - f"http://localhost:{self.port}/v1/tenant", - json={ - 'new_tenant_id': new_tenant_id.hex if new_tenant_id else None, - }, - ) - self.verbose_error(res) - if res.status_code == 409: - raise Exception(f'could not create tenant: already exists for id {new_tenant_id}') - new_tenant_id = res.json() - assert isinstance(new_tenant_id, str) - return uuid.UUID(new_tenant_id) - - def timeline_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]: - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline") - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, list) - return res_json - def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: res = self.get( f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1" @@ -864,6 +864,14 @@ class NeonPageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def timeline_delete(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID): + res = self.delete( + f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}") + self.verbose_error(res) + res_json = res.json() + assert res_json is None + return res_json + def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: res = self.get( f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver"