diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 5a84763697..13e684da24 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -383,6 +383,10 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'switch_aux_file_policy'")?, + lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()), + lsn_lease_length_for_ts: settings + .remove("lsn_lease_length_for_ts") + .map(|x| x.to_string()), }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") @@ -506,6 +510,10 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'switch_aux_file_policy'")?, + lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()), + lsn_lease_length_for_ts: settings + .remove("lsn_lease_length_for_ts") + .map(|x| x.to_string()), } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9311dab33c..70db0b7344 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -177,6 +177,20 @@ serde_with::serde_conv!( |value: String| -> Result<_, humantime::TimestampError> { humantime::parse_rfc3339(&value) } ); +impl LsnLease { + /// The default length for an explicit LSN lease request (10 minutes). + pub const DEFAULT_LENGTH: Duration = Duration::from_secs(10 * 60); + + /// The default length for an implicit LSN lease granted during + /// `get_lsn_by_timestamp` request (1 minutes). + pub const DEFAULT_LENGTH_FOR_TS: Duration = Duration::from_secs(60); + + /// Checks whether the lease is expired. + pub fn is_expired(&self, now: &SystemTime) -> bool { + now > &self.valid_until + } +} + /// The only [`TenantState`] variants we could be `TenantState::Activating` from. #[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum ActivatingFrom { @@ -322,6 +336,8 @@ pub struct TenantConfig { pub timeline_get_throttle: Option, pub image_layer_creation_check_threshold: Option, pub switch_aux_file_policy: Option, + pub lsn_lease_length: Option, + pub lsn_lease_length_for_ts: Option, } /// The policy for the aux file storage. It can be switched through `switch_aux_file_policy` diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 657708c0d6..482879630a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1730,7 +1730,7 @@ async fn lsn_lease_handler( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; let result = timeline - .make_lsn_lease(lsn, &ctx) + .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx) .map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?; json_response(StatusCode::OK, result) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ae389826d5..ebc23e8945 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -935,7 +935,7 @@ impl PageServerHandler { let timeline = self .get_active_tenant_timeline(tenant_shard_id.tenant_id, timeline_id, shard_selector) .await?; - let lease = timeline.make_lsn_lease(lsn, ctx)?; + let lease = timeline.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)?; let valid_until = lease .valid_until .duration_since(SystemTime::UNIX_EPOCH) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 7b30c3ecf7..5a334d0290 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -240,6 +240,7 @@ pub struct GcResult { pub layers_needed_by_cutoff: u64, pub layers_needed_by_pitr: u64, pub layers_needed_by_branches: u64, + pub layers_needed_by_leases: u64, pub layers_not_updated: u64, pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. @@ -269,6 +270,7 @@ impl AddAssign for GcResult { self.layers_needed_by_pitr += other.layers_needed_by_pitr; self.layers_needed_by_cutoff += other.layers_needed_by_cutoff; self.layers_needed_by_branches += other.layers_needed_by_branches; + self.layers_needed_by_leases += other.layers_needed_by_leases; self.layers_not_updated += other.layers_not_updated; self.layers_removed += other.layers_removed; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 801321e36d..ca5765c99b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -31,6 +31,7 @@ use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; use remote_storage::TimeoutOrCancel; use std::fmt; +use std::time::SystemTime; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; use tokio::sync::watch; @@ -65,9 +66,9 @@ use self::timeline::uninit::TimelineCreateGuard; use self::timeline::uninit::TimelineExclusionError; use self::timeline::uninit::UninitializedTimeline; use self::timeline::EvictionTaskTenantState; +use self::timeline::GcCutoffs; use self::timeline::TimelineResources; use self::timeline::WaitLsnError; -use self::timeline::{GcCutoffs, GcInfo}; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::DeletionQueueClient; @@ -2428,6 +2429,13 @@ impl Tenant { } } + pub fn get_lsn_lease_length(&self) -> Duration { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .lsn_lease_length + .unwrap_or(self.conf.default_tenant_conf.lsn_lease_length) + } + pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) { // Use read-copy-update in order to avoid overwriting the location config // state if this races with [`Tenant::set_new_location_config`]. Note that @@ -3010,12 +3018,13 @@ impl Tenant { { let mut target = timeline.gc_info.write().unwrap(); + let now = SystemTime::now(); + target.leases.retain(|_, lease| !lease.is_expired(&now)); + match gc_cutoffs.remove(&timeline.timeline_id) { Some(cutoffs) => { - *target = GcInfo { - retain_lsns: branchpoints, - cutoffs, - }; + target.retain_lsns = branchpoints; + target.cutoffs = cutoffs; } None => { // reasons for this being unavailable: @@ -3833,6 +3842,8 @@ pub(crate) mod harness { tenant_conf.image_layer_creation_check_threshold, ), switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy), + lsn_lease_length: Some(tenant_conf.lsn_lease_length), + lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts), } } } @@ -6939,4 +6950,93 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_lsn_lease() -> anyhow::Result<()> { + let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await; + let key = Key::from_hex("010000000033333333444444445500000000").unwrap(); + + let end_lsn = Lsn(0x100); + let image_layers = (0x20..=0x90) + .step_by(0x10) + .map(|n| { + ( + Lsn(n), + vec![(key, test_img(&format!("data key at {:x}", n)))], + ) + }) + .collect(); + + let timeline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + Vec::new(), + image_layers, + end_lsn, + ) + .await?; + + let leased_lsns = [0x30, 0x50, 0x70]; + let mut leases = Vec::new(); + let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| { + leases.push(timeline.make_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)?); + Ok(()) + }); + + // Renewing with shorter lease should not change the lease. + let updated_lease_0 = + timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?; + assert_eq!(updated_lease_0.valid_until, leases[0].valid_until); + + // Renewing with a long lease should renew lease with later expiration time. + let updated_lease_1 = timeline.make_lsn_lease( + Lsn(leased_lsns[1]), + timeline.get_lsn_lease_length() * 2, + &ctx, + )?; + + assert!(updated_lease_1.valid_until > leases[1].valid_until); + + // Force set disk consistent lsn so we can get the cutoff at `end_lsn`. + info!( + "latest_gc_cutoff_lsn: {}", + *timeline.get_latest_gc_cutoff_lsn() + ); + timeline.force_set_disk_consistent_lsn(end_lsn); + + let res = tenant + .gc_iteration( + Some(TIMELINE_ID), + 0, + Duration::ZERO, + &CancellationToken::new(), + &ctx, + ) + .await?; + + // Keeping everything <= Lsn(0x80) b/c leases: + // 0/10: initdb layer + // (0/20..=0/70).step_by(0x10): image layers added when creating the timeline. + assert_eq!(res.layers_needed_by_leases, 7); + // Keeping 0/90 b/c it is the latest layer. + assert_eq!(res.layers_not_updated, 1); + // Removed 0/80. + assert_eq!(res.layers_removed, 1); + + // Make lease on a already GC-ed LSN. + // 0/80 does not have a valid lease + is below latest_gc_cutoff + assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn()); + let res = timeline.make_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx); + assert!(res.is_err()); + + // Should still be able to renew a currently valid lease + // Assumption: original lease to is still valid for 0/50. + let _ = + timeline.make_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)?; + + Ok(()) + } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 342d705954..1b9be12642 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -13,6 +13,7 @@ use pageserver_api::models::AuxFilePolicy; use pageserver_api::models::CompactionAlgorithm; use pageserver_api::models::CompactionAlgorithmSettings; use pageserver_api::models::EvictionPolicy; +use pageserver_api::models::LsnLease; use pageserver_api::models::{self, ThrottleConfig}; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize}; use serde::de::IntoDeserializer; @@ -377,6 +378,16 @@ pub struct TenantConf { /// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux /// file is written. pub switch_aux_file_policy: AuxFilePolicy, + + /// The length for an explicit LSN lease request. + /// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval. + #[serde(with = "humantime_serde")] + pub lsn_lease_length: Duration, + + /// The length for an implicit LSN lease granted as part of `get_lsn_by_timestamp` request. + /// Layers needed to reconstruct pages at LSN will not be GC-ed during this interval. + #[serde(with = "humantime_serde")] + pub lsn_lease_length_for_ts: Duration, } /// Same as TenantConf, but this struct preserves the information about @@ -476,6 +487,16 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub switch_aux_file_policy: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + #[serde(default)] + pub lsn_lease_length: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + #[serde(default)] + pub lsn_lease_length_for_ts: Option, } impl TenantConfOpt { @@ -538,6 +559,12 @@ impl TenantConfOpt { switch_aux_file_policy: self .switch_aux_file_policy .unwrap_or(global_conf.switch_aux_file_policy), + lsn_lease_length: self + .lsn_lease_length + .unwrap_or(global_conf.lsn_lease_length), + lsn_lease_length_for_ts: self + .lsn_lease_length_for_ts + .unwrap_or(global_conf.lsn_lease_length_for_ts), } } } @@ -582,6 +609,8 @@ impl Default for TenantConf { timeline_get_throttle: crate::tenant::throttle::Config::disabled(), image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD, switch_aux_file_policy: AuxFilePolicy::default_tenant_config(), + lsn_lease_length: LsnLease::DEFAULT_LENGTH, + lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, } } } @@ -657,6 +686,8 @@ impl From for models::TenantConfig { timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from), image_layer_creation_check_threshold: value.image_layer_creation_check_threshold, switch_aux_file_policy: value.switch_aux_file_policy, + lsn_lease_length: value.lsn_lease_length.map(humantime), + lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime), } } } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index a6dfa84f35..d679b78f32 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { // cutoff specified as time. let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); + let mut first = true; loop { tokio::select! { @@ -362,6 +363,14 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { if first { first = false; + + if delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel) + .await + .is_err() + { + break; + } + if random_init_delay(period, &cancel).await.is_err() { break; } @@ -531,6 +540,21 @@ pub(crate) async fn random_init_delay( } } +/// Delays GC by defaul lease length at restart. +/// +/// We do this as the leases mapping are not persisted to disk. By delaying GC by default +/// length, we gurantees that all the leases we granted before the restart will expire +/// when we run GC for the first time after the restart. +pub(crate) async fn delay_by_lease_length( + length: Duration, + cancel: &CancellationToken, +) -> Result<(), Cancelled> { + match tokio::time::timeout(length, cancel.cancelled()).await { + Ok(_) => Err(Cancelled), + Err(_) => Ok(()), + } +} + /// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric. pub(crate) fn warn_when_period_overrun( elapsed: Duration, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 08bec329e1..a4f1108635 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -47,7 +47,6 @@ use utils::{ vec_map::VecMap, }; -use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -61,6 +60,10 @@ use std::{ cmp::{max, min, Ordering}, ops::ControlFlow, }; +use std::{ + collections::btree_map::Entry, + ops::{Deref, Range}, +}; use crate::metrics::GetKind; use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS; @@ -454,6 +457,9 @@ pub(crate) struct GcInfo { /// The cutoff coordinates, which are combined by selecting the minimum. pub(crate) cutoffs: GcCutoffs, + + /// Leases granted to particular LSNs. + pub(crate) leases: BTreeMap, } impl GcInfo { @@ -1555,17 +1561,46 @@ impl Timeline { Ok(()) } - /// Obtains a temporary lease blocking garbage collection for the given LSN + /// Obtains a temporary lease blocking garbage collection for the given LSN. + /// + /// This function will error if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is also + /// no existing lease to renew. If there is an existing lease in the map, the lease will be renewed only if + /// the request extends the lease. The returned lease is therefore the maximum between the existing lease and + /// the requesting lease. pub(crate) fn make_lsn_lease( &self, - _lsn: Lsn, + lsn: Lsn, + length: Duration, _ctx: &RequestContext, ) -> anyhow::Result { - const LEASE_LENGTH: Duration = Duration::from_secs(5 * 60); - let lease = LsnLease { - valid_until: SystemTime::now() + LEASE_LENGTH, + let lease = { + let mut gc_info = self.gc_info.write().unwrap(); + + let valid_until = SystemTime::now() + length; + + let entry = gc_info.leases.entry(lsn); + + let lease = { + if let Entry::Occupied(mut occupied) = entry { + let existing_lease = occupied.get_mut(); + if valid_until > existing_lease.valid_until { + existing_lease.valid_until = valid_until; + } + existing_lease.clone() + } else { + // Reject already GC-ed LSN (lsn < latest_gc_cutoff) + let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn(); + if lsn < *latest_gc_cutoff_lsn { + bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn); + } + + entry.or_insert(LsnLease { valid_until }).clone() + } + }; + + lease }; - // TODO: dummy implementation + Ok(lease) } @@ -2082,6 +2117,24 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10; // Private functions impl Timeline { + pub(crate) fn get_lsn_lease_length(&self) -> Duration { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .lsn_lease_length + .unwrap_or(self.conf.default_tenant_conf.lsn_lease_length) + } + + // TODO(yuchen): remove unused flag after implementing https://github.com/neondatabase/neon/issues/8072 + #[allow(unused)] + pub(crate) fn get_lsn_lease_length_for_ts(&self) -> Duration { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .lsn_lease_length_for_ts + .unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts) + } + pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy { let tenant_conf = self.tenant_conf.load(); tenant_conf @@ -4907,13 +4960,25 @@ impl Timeline { return Err(GcError::TimelineCancelled); } - let (horizon_cutoff, pitr_cutoff, retain_lsns) = { + let (horizon_cutoff, pitr_cutoff, retain_lsns, max_lsn_with_valid_lease) = { let gc_info = self.gc_info.read().unwrap(); let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn()); let pitr_cutoff = gc_info.cutoffs.pitr; let retain_lsns = gc_info.retain_lsns.clone(); - (horizon_cutoff, pitr_cutoff, retain_lsns) + + // Gets the maximum LSN that holds the valid lease. + // + // Caveat: `refresh_gc_info` is in charged of updating the lease map. + // Here, we do not check for stale leases again. + let max_lsn_with_valid_lease = gc_info.leases.last_key_value().map(|(lsn, _)| *lsn); + + ( + horizon_cutoff, + pitr_cutoff, + retain_lsns, + max_lsn_with_valid_lease, + ) }; let mut new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); @@ -4944,7 +5009,13 @@ impl Timeline { .set(Lsn::INVALID.0 as i64); let res = self - .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff) + .gc_timeline( + horizon_cutoff, + pitr_cutoff, + retain_lsns, + max_lsn_with_valid_lease, + new_gc_cutoff, + ) .instrument( info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff), ) @@ -4961,6 +5032,7 @@ impl Timeline { horizon_cutoff: Lsn, pitr_cutoff: Lsn, retain_lsns: Vec, + max_lsn_with_valid_lease: Option, new_gc_cutoff: Lsn, ) -> Result { // FIXME: if there is an ongoing detach_from_ancestor, we should just skip gc @@ -5009,7 +5081,8 @@ impl Timeline { // 1. it is older than cutoff LSN; // 2. it is older than PITR interval; // 3. it doesn't need to be retained for 'retain_lsns'; - // 4. newer on-disk image layers cover the layer's whole key range + // 4. it does not need to be kept for LSNs holding valid leases. + // 5. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable let mut guard = self.layers.write().await; @@ -5060,7 +5133,21 @@ impl Timeline { } } - // 4. Is there a later on-disk layer for this relation? + // 4. Is there a valid lease that requires us to keep this layer? + if let Some(lsn) = &max_lsn_with_valid_lease { + // keep if layer start <= any of the lease + if &l.get_lsn_range().start <= lsn { + debug!( + "keeping {} because there is a valid lease preventing GC at {}", + l.layer_name(), + lsn, + ); + result.layers_needed_by_leases += 1; + continue 'outer; + } + } + + // 5. Is there a later on-disk layer for this relation? // // The end-LSN is exclusive, while disk_consistent_lsn is // inclusive. For example, if disk_consistent_lsn is 100, it is @@ -5438,6 +5525,11 @@ impl Timeline { self.last_record_lsn.advance(new_lsn); } + #[cfg(test)] + pub(super) fn force_set_disk_consistent_lsn(&self, new_value: Lsn) { + self.disk_consistent_lsn.store(new_value); + } + /// Force create an image layer and place it into the layer map. /// /// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`] diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 1d193b8999..f4667a82dc 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -195,6 +195,8 @@ def test_fully_custom_config(positive_env: NeonEnv): "walreceiver_connect_timeout": "13m", "image_layer_creation_check_threshold": 1, "switch_aux_file_policy": "cross-validation", + "lsn_lease_length": "1m", + "lsn_lease_length_for_ts": "5s", } ps_http = env.pageserver.http_client()