diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 5afa38c926..4025d6706e 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -4,6 +4,7 @@ use crate::config::PageServerConf; use crate::http::models::TenantInfo; use crate::layered_repository::{load_metadata, Repository, Timeline}; +use crate::repository::RepositoryTimeline; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; @@ -94,12 +95,6 @@ struct Tenant { state: TenantState, /// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk. repo: Arc, - /// Timelines, located locally in the pageserver's datadir. - /// Timelines can entirely be removed entirely by the `detach` operation only. - /// - /// Local timelines have more metadata that's loaded into memory, - /// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`]. - local_timelines: HashMap>, } #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] @@ -288,7 +283,6 @@ pub fn create_tenant_repository( v.insert(Tenant { state: TenantState::Idle, repo, - local_timelines: HashMap::new(), }); Ok(Some(tenant_id)) } @@ -379,20 +373,11 @@ pub fn get_local_timeline_with_load( tenant_id: ZTenantId, timeline_id: ZTimelineId, ) -> anyhow::Result> { - let mut m = tenants_state::write_tenants(); - let tenant = m - .get_mut(&tenant_id) - .with_context(|| format!("Tenant {tenant_id} not found"))?; - - if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) { - Ok(Arc::clone(page_tline)) - } else { - let page_tline = load_local_timeline(&tenant.repo, timeline_id) - .with_context(|| format!("Failed to load local timeline for tenant {tenant_id}"))?; - tenant - .local_timelines - .insert(timeline_id, Arc::clone(&page_tline)); - Ok(page_tline) + let repository = get_repository_for_tenant(tenant_id)?; + match repository.get_timeline(timeline_id) { + Some(RepositoryTimeline::Loaded(loaded_timeline)) => Ok(loaded_timeline), + _ => load_local_timeline(&repository, timeline_id) + .with_context(|| format!("Failed to load local timeline for tenant {tenant_id}")), } } @@ -419,10 +404,7 @@ pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow thread_mgr::shutdown_threads(None, None, Some(timeline_id)); debug!("thread shutdown completed"); match tenants_state::write_tenants().get_mut(&tenant_id) { - Some(tenant) => { - tenant.repo.delete_timeline(timeline_id)?; - tenant.local_timelines.remove(&timeline_id); - } + Some(tenant) => tenant.repo.delete_timeline(timeline_id)?, None => anyhow::bail!("Tenant {tenant_id} not found in local tenant state"), } @@ -434,37 +416,31 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any // shutdown the tenant and timeline threads: gc, compaction, page service threads) thread_mgr::shutdown_threads(None, Some(tenant_id), None); - // FIXME should we protect somehow from starting new threads/walreceivers when tenant is in stopping state? - // send stop signal to wal receiver and collect join handles while holding the lock - let walreceiver_join_handles = { - let tenants = tenants_state::write_tenants(); - let tenant = tenants.get(&tenant_id).context("tenant not found")?; - let mut walreceiver_join_handles = Vec::with_capacity(tenant.local_timelines.len()); - for timeline_id in tenant.local_timelines.keys() { + let mut walreceiver_join_handles = Vec::new(); + let removed_tenant = { + let mut tenants_accessor = tenants_state::write_tenants(); + tenants_accessor.remove(&tenant_id) + }; + if let Some(tenant) = removed_tenant { + for (timeline_id, _) in tenant.repo.list_timelines() { let (sender, receiver) = std::sync::mpsc::channel::<()>(); tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach { - id: ZTenantTimelineId::new(tenant_id, *timeline_id), + id: ZTenantTimelineId::new(tenant_id, timeline_id), join_confirmation_sender: sender, }); - walreceiver_join_handles.push((*timeline_id, receiver)); + walreceiver_join_handles.push((timeline_id, receiver)); } - // drop the tenants lock - walreceiver_join_handles - }; + } // wait for wal receivers to stop without holding the lock, because walreceiver // will attempt to change tenant state which is protected by the same global tenants lock. - // TODO do we need a timeout here? how to handle it? // recv_timeout is broken: https://github.com/rust-lang/rust/issues/94518#issuecomment-1057440631 - // need to use crossbeam-channel for (timeline_id, join_handle) in walreceiver_join_handles { info!("waiting for wal receiver to shutdown timeline_id {timeline_id}"); join_handle.recv().context("failed to join walreceiver")?; info!("wal receiver shutdown confirmed timeline_id {timeline_id}"); } - tenants_state::write_tenants().remove(&tenant_id); - // If removal fails there will be no way to successfully retry detach, // because tenant no longer exists in in memory map. And it needs to be removed from it // before we remove files because it contains references to repository @@ -590,34 +566,18 @@ fn attach_downloaded_tenant( repo: &Repository, downloaded_timelines: HashSet, ) -> anyhow::Result<()> { - let mut registration_queue = Vec::with_capacity(downloaded_timelines.len()); - - // first need to register the in-mem representations, to avoid missing ancestors during the local disk data registration for timeline_id in downloaded_timelines { + // first, register timeline metadata repo.attach_timeline(timeline_id).with_context(|| { format!("Failed to load timeline {timeline_id} into in-memory repository") })?; - registration_queue.push(timeline_id); - } - - for timeline_id in registration_queue { - let tenant_id = repo.tenant_id(); - match tenants_state::write_tenants().get_mut(&tenant_id) { - Some(tenant) => match tenant.local_timelines.entry(timeline_id) { - Entry::Occupied(_) => { - anyhow::bail!("Local timeline {timeline_id} already registered") - } - Entry::Vacant(v) => { - v.insert(load_local_timeline(repo, timeline_id).with_context(|| { - format!("Failed to register add local timeline for tenant {tenant_id}") - })?); - } - }, - None => anyhow::bail!( - "Tenant {} not found in local tenant state", - repo.tenant_id() - ), - } + // and then load its layers in memory + let _ = load_local_timeline(repo, timeline_id).with_context(|| { + format!( + "Failed to register add local timeline for tenant {}", + repo.tenant_id(), + ) + })?; } Ok(()) @@ -647,7 +607,6 @@ fn load_local_repo( Tenant { state: TenantState::Idle, repo, - local_timelines: HashMap::new(), } });