From 777930898580110e8800c3e94b41aceea27e6063 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 19 Aug 2022 17:59:06 +0300 Subject: [PATCH] Ensure timeline logical size is initialized once --- pageserver/src/layered_repository.rs | 18 +++++++------- pageserver/src/layered_repository/timeline.rs | 12 ++++++++++ pageserver/src/tenant_mgr.rs | 24 +++++++++++++------ pageserver/src/walreceiver.rs | 4 ++-- 4 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index d67b1b0130..dd173498b9 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -205,7 +205,7 @@ impl Repository { crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach { id: ZTenantTimelineId::new(self.tenant_id(), timeline_id), - datadir: Arc::clone(&timeline), + timeline: Arc::clone(&timeline), }); Ok(timeline) @@ -572,8 +572,7 @@ impl Repository { } }; debug!( - "timeline {} found on a local disk, but not loaded into the memory, loading", - &timeline_id + "timeline {timeline_id} found on a local disk, but not loaded into the memory, loading" ); let timeline = self.load_local_timeline(timeline_id, timelines)?; let was_loaded = timelines.insert( @@ -585,10 +584,6 @@ impl Repository { || matches!(was_loaded, Some(LayeredTimelineEntry::Unloaded { .. })), "assertion failure, inserted wrong timeline in an incorrect state" ); - crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach { - id: ZTenantTimelineId::new(self.tenant_id(), timeline_id), - datadir: Arc::clone(&timeline), - }); Ok(Some(timeline)) } @@ -627,7 +622,14 @@ impl Repository { .load_layer_map(disk_consistent_lsn) .context("failed to load layermap")?; - Ok(Arc::new(timeline)) + let timeline = Arc::new(timeline); + + crate::tenant_mgr::try_send_timeline_update(LocalTimelineUpdate::Attach { + id: ZTenantTimelineId::new(self.tenant_id(), timeline_id), + timeline: Arc::clone(&timeline), + }); + + Ok(timeline) } pub fn new( diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 7bbde53dbd..fb5a4d0b83 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -412,6 +412,11 @@ pub struct Timeline { /// and `set_current_logical_size` functions to modify this, they will /// also keep the prometheus metric in sync. current_logical_size: AtomicI64, + // TODO we don't have a good, API to ensure on a compilation level + // that the timeline passes all initialization. + // Hence we ensure that we init at least once for every timeline + // and keep this flag to avoid potentually long recomputes. + logical_size_initialized: AtomicBool, /// Information about the last processed message by the WAL receiver, /// or None if WAL receiver has not received anything for this timeline @@ -731,6 +736,7 @@ impl Timeline { initdb_lsn: metadata.initdb_lsn(), current_logical_size: AtomicI64::new(0), + logical_size_initialized: AtomicBool::new(false), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, @@ -835,6 +841,10 @@ impl Timeline { /// /// This can be a slow operation. pub fn init_logical_size(&self) -> Result<()> { + if self.logical_size_initialized.load(AtomicOrdering::Acquire) { + return Ok(()); + } + // Try a fast-path first: // Copy logical size from ancestor timeline if there has been no changes on this // branch, and no changes on the ancestor branch since the branch point. @@ -907,6 +917,8 @@ impl Timeline { fn set_current_logical_size(&self, new_size: u64) { self.current_logical_size .store(new_size as i64, AtomicOrdering::SeqCst); + self.logical_size_initialized + .store(true, AtomicOrdering::SeqCst); // Also set the value in the prometheus gauge. Same race condition // here as in `update_current_logical_size`. diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index f5b4308067..921d973a41 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -172,15 +172,15 @@ pub enum LocalTimelineUpdate { }, Attach { id: ZTenantTimelineId, - datadir: Arc, + timeline: Arc, }, } impl std::fmt::Debug for LocalTimelineUpdate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Detach { id, .. } => f.debug_tuple("Remove").field(id).finish(), - Self::Attach { id, .. } => f.debug_tuple("Add").field(id).finish(), + Self::Detach { id, .. } => f.debug_tuple("Detach").field(id).finish(), + Self::Attach { id, .. } => f.debug_tuple("Attach").field(id).finish(), } } } @@ -376,7 +376,10 @@ pub fn get_local_timeline_with_load( ) -> anyhow::Result> { let repository = get_repository_for_tenant(tenant_id)?; match repository.get_timeline(timeline_id) { - Some(RepositoryTimeline::Loaded(loaded_timeline)) => Ok(loaded_timeline), + Some(RepositoryTimeline::Loaded(loaded_timeline)) => { + loaded_timeline.init_logical_size()?; + Ok(loaded_timeline) + } _ => load_local_timeline(&repository, timeline_id) .with_context(|| format!("Failed to load local timeline for tenant {tenant_id}")), } @@ -435,13 +438,17 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any // wait for wal receivers to stop without holding the lock, because walreceiver // will attempt to change tenant state which is protected by the same global tenants lock. + // TODO do we need a timeout here? how to handle it? // recv_timeout is broken: https://github.com/rust-lang/rust/issues/94518#issuecomment-1057440631 + // need to use crossbeam-channel for (timeline_id, join_handle) in walreceiver_join_handles { info!("waiting for wal receiver to shutdown timeline_id {timeline_id}"); join_handle.recv().context("failed to join walreceiver")?; info!("wal receiver shutdown confirmed timeline_id {timeline_id}"); } + tenants_state::write_tenants().remove(&tenant_id); + // If removal fails there will be no way to successfully retry detach, // because tenant no longer exists in in memory map. And it needs to be removed from it // before we remove files because it contains references to repository @@ -561,12 +568,15 @@ fn attach_downloaded_tenant( repo: &Repository, downloaded_timelines: HashSet, ) -> anyhow::Result<()> { - for timeline_id in downloaded_timelines { - // first, register timeline metadata + // first, register timeline metadata to ensure ancestors will be found later during layer load + for &timeline_id in &downloaded_timelines { repo.attach_timeline(timeline_id).with_context(|| { format!("Failed to load timeline {timeline_id} into in-memory repository") })?; - // and then load its layers in memory + } + + // and then load its layers in memory + for timeline_id in downloaded_timelines { let _ = load_local_timeline(repo, timeline_id).with_context(|| { format!( "Failed to register add local timeline for tenant {}", diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 8a466a8a67..d6420e1d18 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -269,7 +269,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( } } // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. - LocalTimelineUpdate::Attach { id, datadir } => { + LocalTimelineUpdate::Attach { id, timeline } => { let timeline_connection_managers = local_timeline_wal_receivers .entry(id.tenant_id) .or_default(); @@ -305,7 +305,7 @@ async fn wal_receiver_main_thread_loop_step<'a>( id, broker_prefix.to_owned(), etcd_client.clone(), - datadir, + timeline, wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag,