diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 257544a43b..34adbfd926 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -39,7 +39,8 @@ use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; use crate::repository::{ - GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter, + GcResult, RepoIoError, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, + TimelineWriter, }; use crate::repository::{Key, Value}; use crate::tenant_mgr; @@ -158,12 +159,10 @@ pub struct LayeredRepository { // Global pageserver config parameters pub conf: &'static PageServerConf, - // Allows us to gracefully cancel operations that edit the directory - // that backs this layered repository. Usage: + // Freezing the repo disallows any writes to its directory. // - // Use `let _guard = file_lock.try_read()` while writing any files. - // Use `let _guard = file_lock.write().unwrap()` to wait for all writes to finish. - pub file_lock: RwLock<()>, + // Any writers must hold frozen.try_read() while writing. + pub frozen: RwLock, // Overridden tenant-specific config parameters. // We keep TenantConfOpt sturct here to preserve the information @@ -326,19 +325,38 @@ impl Repository for LayeredRepository { horizon: u64, pitr: Duration, checkpoint_before_gc: bool, - ) -> Result { + ) -> Result { let timeline_str = target_timelineid .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); + // Make sure repo is not frozen + let guard = match self.frozen.try_read() { + Ok(g) => g, + Err(_) => return Err(RepoIoError::RepoFreezingError), + }; + if *guard { + return Err(RepoIoError::RepoFrozenError); + } + STORAGE_TIME .with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str]) .observe_closure_duration(|| { self.gc_iteration_internal(target_timelineid, horizon, pitr, checkpoint_before_gc) }) + .map_err(|err| err.into()) } - fn compaction_iteration(&self) -> Result<()> { + fn compaction_iteration(&self) -> Result<(), RepoIoError> { + // Make sure repo is not frozen + let guard = match self.frozen.try_read() { + Ok(g) => g, + Err(_) => return Err(RepoIoError::RepoFreezingError), + }; + if *guard { + return Err(RepoIoError::RepoFrozenError); + } + // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -692,7 +710,7 @@ impl LayeredRepository { ) -> LayeredRepository { LayeredRepository { tenant_id, - file_lock: RwLock::new(()), + frozen: RwLock::new(false), conf, tenant_conf: Arc::new(RwLock::new(tenant_conf)), timelines: Mutex::new(HashMap::new()), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 756c3b8191..1ac3531376 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -196,6 +196,19 @@ impl Display for TimelineSyncStatusUpdate { } } +#[derive(Debug, thiserror::Error)] +pub enum RepoIoError { + #[error("Cannot write to repo path while repo is frozen")] + RepoFrozenError, + + #[error("Cannot write to repo path while repo is being frozen")] + RepoFreezingError, + + /// Unstructured anyhow error + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// /// A repository corresponds to one .neon directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. @@ -252,13 +265,13 @@ pub trait Repository: Send + Sync { horizon: u64, pitr: Duration, checkpoint_before_gc: bool, - ) -> Result; + ) -> Result; /// Perform one compaction iteration. /// This function is periodically called by compactor thread. /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. - fn compaction_iteration(&self) -> Result<()>; + fn compaction_iteration(&self) -> Result<(), RepoIoError>; /// detaches timeline-related in-memory data. fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c73fed140a..bfe56348e2 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -329,9 +329,12 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: (TenantState::Idle, TenantState::Active) => { info!("activating tenant {tenant_id}"); + // Unfreeze the repo, allowing gc/compaction jobs to run + let repo = get_repository_for_tenant(tenant_id)?; + *repo.frozen.write().unwrap() = false; + // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - // TODO maybe use tokio::sync::watch instead? + // down when tenant becomes idle and repo becomes frozen. crate::tenant_tasks::start_compaction_loop(tenant_id)?; crate::tenant_tasks::start_gc_loop(tenant_id)?; } @@ -346,9 +349,9 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: None, ); - // Wait until all gc/compaction tasks finish + // Freeze the repo, waiting for existing gc/compaction to finish let repo = get_repository_for_tenant(tenant_id)?; - let _guard = repo.file_lock.write().unwrap(); + *repo.frozen.write().unwrap() = true; } } diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 68c2bba9f4..cab295d33b 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -5,8 +5,7 @@ use std::collections::HashMap; use std::ops::ControlFlow; use std::time::Duration; -use crate::repository::Repository; -use crate::tenant_mgr::TenantState; +use crate::repository::{RepoIoError, Repository}; use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; use anyhow::{self, Context}; @@ -37,23 +36,15 @@ async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { // Run blocking part of the task let period: Result, _> = tokio::task::spawn_blocking(move || { - // Break if tenant is not active - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - return Ok(ControlFlow::Break(())); - } - - // Break if we're not allowed to write to disk let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - // TODO do this inside repo.compaction_iteration instead. - let _guard = match repo.file_lock.try_read() { - Ok(g) => g, - Err(_) => return Ok(ControlFlow::Break(())), - }; - - // Run compaction let compaction_period = repo.get_compaction_period(); - repo.compaction_iteration()?; - Ok(ControlFlow::Continue(compaction_period)) + match repo.compaction_iteration() { + Ok(_) => Ok(ControlFlow::Continue(compaction_period)), + Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => { + Ok(ControlFlow::Break(())) + } + Err(RepoIoError::Other(e)) => Err(e), + } }) .await; @@ -234,26 +225,18 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { // Run blocking part of the task let period: Result, _> = tokio::task::spawn_blocking(move || { - // Break if tenant is not active - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - return Ok(ControlFlow::Break(())); - } - - // Break if we're not allowed to write to disk let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - // TODO do this inside repo.gc_iteration instead. - let _guard = match repo.file_lock.try_read() { - Ok(g) => g, - Err(_) => return Ok(ControlFlow::Break(())), - }; - - // Run gc let gc_period = repo.get_gc_period(); let gc_horizon = repo.get_gc_horizon(); if gc_horizon > 0 { - repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; + match repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) { + Ok(_) => return Ok(ControlFlow::Continue(gc_period)), + Err(RepoIoError::RepoFreezingError | RepoIoError::RepoFrozenError) => { + return Ok(ControlFlow::Break(())) + } + Err(RepoIoError::Other(e)) => return Err(e), + } } - Ok(ControlFlow::Continue(gc_period)) }) .await;