From 138f008bab8260cec05d1b3353c1f4ecba1ebb0c Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 6 Aug 2024 12:09:56 +0300 Subject: [PATCH] feat: persistent gc blocking (#8600) Currently, we do not have facilities to persistently block GC on a tenant for whatever reason. We could do a tenant configuration update, but that is risky for generation numbers and would also be transient. Introduce a `gc_block` facility in the tenant, which manages per timeline blocking reasons. Additionally, add HTTP endpoints for enabling/disabling manual gc blocking for a specific timeline. For debugging, individual tenant status now includes a similar string representation logged when GC is skipped. Cc: #6994 --- libs/pageserver_api/src/models.rs | 9 + pageserver/src/http/openapi_spec.yml | 39 ++++ pageserver/src/http/routes.rs | 76 +++++++ pageserver/src/tenant.rs | 30 +++ pageserver/src/tenant/gc_block.rs | 213 ++++++++++++++++++ .../src/tenant/remote_timeline_client.rs | 117 ++++++++++ .../tenant/remote_timeline_client/index.rs | 133 +++++++++++ pageserver/src/tenant/timeline.rs | 16 ++ pageserver/src/tenant/timeline/delete.rs | 2 + test_runner/fixtures/pageserver/http.py | 16 ++ .../regress/test_timeline_gc_blocking.py | 67 ++++++ 11 files changed, 718 insertions(+) create mode 100644 pageserver/src/tenant/gc_block.rs create mode 100644 test_runner/regress/test_timeline_gc_blocking.py diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 591c45d908..b541bba6a1 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -637,6 +637,13 @@ pub struct TenantInfo { pub current_physical_size: Option, // physical size is only included in `tenant_status` endpoint pub attachment_status: TenantAttachmentStatus, pub generation: u32, + + /// Opaque explanation if gc is being blocked. + /// + /// Only looked up for the individual tenant detail, not the listing. This is purely for + /// debugging, not included in openapi. + #[serde(skip_serializing_if = "Option::is_none")] + pub gc_blocking: Option, } #[derive(Serialize, Deserialize, Clone)] @@ -1427,6 +1434,7 @@ mod tests { current_physical_size: Some(42), attachment_status: TenantAttachmentStatus::Attached, generation: 1, + gc_blocking: None, }; let expected_active = json!({ "id": original_active.id.to_string(), @@ -1449,6 +1457,7 @@ mod tests { current_physical_size: Some(42), attachment_status: TenantAttachmentStatus::Attached, generation: 1, + gc_blocking: None, }; let expected_broken = json!({ "id": original_broken.id.to_string(), diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 59e646d0ca..4656f2c93a 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -308,6 +308,45 @@ paths: application/json: schema: type: string + + /v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/block_gc: + parameters: + - name: tenant_shard_id + in: path + required: true + schema: + type: string + - name: timeline_id + in: path + required: true + schema: + type: string + format: hex + post: + description: Persistently add a gc blocking at the tenant level because of this timeline + responses: + "200": + description: OK + + /v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/unblock_gc: + parameters: + - name: tenant_shard_id + in: path + required: true + schema: + type: string + - name: timeline_id + in: path + required: true + schema: + type: string + format: hex + post: + description: Persistently remove a tenant level gc blocking for this timeline + responses: + "200": + description: OK + /v1/tenant/{tenant_shard_id}/location_config: parameters: - name: tenant_shard_id diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 117f2c5869..fdab780bfb 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -935,6 +935,7 @@ async fn tenant_list_handler( generation: (*gen) .into() .expect("Tenants are always attached with a generation"), + gc_blocking: None, }) .collect::>(); @@ -986,6 +987,7 @@ async fn tenant_status( .generation() .into() .expect("Tenants are always attached with a generation"), + gc_blocking: tenant.gc_block.summary().map(|x| format!("{x:?}")), }, walredo: tenant.wal_redo_manager_status(), timelines: tenant.list_timeline_ids(), @@ -1226,6 +1228,72 @@ async fn evict_timeline_layer_handler( } } +async fn timeline_gc_blocking_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + block_or_unblock_gc(request, true).await +} + +async fn timeline_gc_unblocking_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + block_or_unblock_gc(request, false).await +} + +/// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`. +/// +/// Both are technically unsafe because they might fire off index uploads, thus they are POST. +async fn block_or_unblock_gc( + request: Request, + block: bool, +) -> Result, ApiError> { + use crate::tenant::{ + remote_timeline_client::WaitCompletionError, upload_queue::NotInitialized, + }; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let state = get_state(&request); + + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + + let timeline = tenant.get_timeline(timeline_id, true)?; + + let fut = async { + if block { + timeline.block_gc(&tenant).await.map(|_| ()) + } else { + timeline.unblock_gc(&tenant).await + } + }; + + let span = tracing::info_span!( + "block_or_unblock_gc", + tenant_id = %tenant_shard_id.tenant_id, + shard_id = %tenant_shard_id.shard_slug(), + timeline_id = %timeline_id, + block = block, + ); + + let res = fut.instrument(span).await; + + res.map_err(|e| { + if e.is::() || e.is::() { + ApiError::ShuttingDown + } else { + ApiError::InternalServerError(e) + } + })?; + + json_response(StatusCode::OK, ()) +} + /// Get tenant_size SVG graph along with the JSON data. fn synthetic_size_html_response( inputs: ModelInputs, @@ -2904,6 +2972,14 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name", |r| api_handler(r, evict_timeline_layer_handler), ) + .post( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc", + |r| api_handler(r, timeline_gc_blocking_handler), + ) + .post( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc", + |r| api_handler(r, timeline_gc_unblocking_handler), + ) .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| { api_handler(r, secondary_upload_handler) }) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 72d3aedd05..de9b55d847 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -148,6 +148,7 @@ pub(crate) mod timeline; pub mod size; +mod gc_block; pub(crate) mod throttle; pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -303,6 +304,12 @@ pub struct Tenant { /// An ongoing timeline detach must be checked during attempts to GC or compact a timeline. ongoing_timeline_detach: std::sync::Mutex>, + /// `index_part.json` based gc blocking reason tracking. + /// + /// New gc iterations must start a new iteration by acquiring `GcBlock::start` before + /// proceeding. + pub(crate) gc_block: gc_block::GcBlock, + l0_flush_global_state: L0FlushGlobalState, } @@ -1036,6 +1043,8 @@ impl Tenant { } } + let mut gc_blocks = HashMap::new(); + // For every timeline, download the metadata file, scan the local directory, // and build a layer map that contains an entry for each remote and local // layer file. @@ -1045,6 +1054,16 @@ impl Tenant { .remove(&timeline_id) .expect("just put it in above"); + if let Some(blocking) = index_part.gc_blocking.as_ref() { + // could just filter these away, but it helps while testing + anyhow::ensure!( + !blocking.reasons.is_empty(), + "index_part for {timeline_id} is malformed: it should not have gc blocking with zero reasons" + ); + let prev = gc_blocks.insert(timeline_id, blocking.reasons); + assert!(prev.is_none()); + } + // TODO again handle early failure self.load_remote_timeline( timeline_id, @@ -1089,6 +1108,8 @@ impl Tenant { // IndexPart is the source of truth. self.clean_up_timelines(&existent_timelines)?; + self.gc_block.set_scanned(gc_blocks); + fail::fail_point!("attach-before-activate", |_| { anyhow::bail!("attach-before-activate"); }); @@ -1679,6 +1700,14 @@ impl Tenant { } } + let _guard = match self.gc_block.start().await { + Ok(guard) => guard, + Err(reasons) => { + info!("Skipping GC: {reasons}"); + return Ok(GcResult::default()); + } + }; + self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx) .await } @@ -2691,6 +2720,7 @@ impl Tenant { )), tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)), ongoing_timeline_detach: std::sync::Mutex::default(), + gc_block: Default::default(), l0_flush_global_state, } } diff --git a/pageserver/src/tenant/gc_block.rs b/pageserver/src/tenant/gc_block.rs new file mode 100644 index 0000000000..8b41ba1746 --- /dev/null +++ b/pageserver/src/tenant/gc_block.rs @@ -0,0 +1,213 @@ +use std::collections::HashMap; + +use utils::id::TimelineId; + +use super::remote_timeline_client::index::GcBlockingReason; + +type Storage = HashMap>; + +#[derive(Default)] +pub(crate) struct GcBlock { + /// The timelines which have current reasons to block gc. + /// + /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done + /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`. + reasons: std::sync::Mutex, + blocking: tokio::sync::Mutex<()>, +} + +impl GcBlock { + /// Start another gc iteration. + /// + /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with + /// it's ending, or if not currently possible, a value describing the reasons why not. + /// + /// Cancellation safe. + pub(super) async fn start(&self) -> Result, BlockingReasons> { + let reasons = { + let g = self.reasons.lock().unwrap(); + + // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in + // tests, we use everything. we should warn if the gc has been consecutively blocked + // for more than 1h (within single tenant session?). + BlockingReasons::clean_and_summarize(g) + }; + + if let Some(reasons) = reasons { + Err(reasons) + } else { + Ok(Guard { + _inner: self.blocking.lock().await, + }) + } + } + + pub(crate) fn summary(&self) -> Option { + let g = self.reasons.lock().unwrap(); + + BlockingReasons::summarize(&g) + } + + /// Start blocking gc for this one timeline for the given reason. + /// + /// This is not a guard based API but instead it mimics set API. The returned future will not + /// resolve until an existing gc round has completed. + /// + /// Returns true if this block was new, false if gc was already blocked for this reason. + /// + /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will + /// keep the gc blocking reason. + pub(crate) async fn insert( + &self, + timeline: &super::Timeline, + reason: GcBlockingReason, + ) -> anyhow::Result { + let (added, uploaded) = { + let mut g = self.reasons.lock().unwrap(); + let set = g.entry(timeline.timeline_id).or_default(); + let added = set.insert(reason); + + // LOCK ORDER: intentionally hold the lock, see self.reasons. + let uploaded = timeline + .remote_client + .schedule_insert_gc_block_reason(reason)?; + + (added, uploaded) + }; + + uploaded.await?; + + // ensure that any ongoing gc iteration has completed + drop(self.blocking.lock().await); + + Ok(added) + } + + /// Remove blocking gc for this one timeline and the given reason. + pub(crate) async fn remove( + &self, + timeline: &super::Timeline, + reason: GcBlockingReason, + ) -> anyhow::Result<()> { + use std::collections::hash_map::Entry; + + super::span::debug_assert_current_span_has_tenant_and_timeline_id(); + + let (remaining_blocks, uploaded) = { + let mut g = self.reasons.lock().unwrap(); + match g.entry(timeline.timeline_id) { + Entry::Occupied(mut oe) => { + let set = oe.get_mut(); + set.remove(reason); + if set.is_empty() { + oe.remove(); + } + } + Entry::Vacant(_) => { + // we must still do the index_part.json update regardless, in case we had earlier + // been cancelled + } + } + + let remaining_blocks = g.len(); + + // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons + let uploaded = timeline + .remote_client + .schedule_remove_gc_block_reason(reason)?; + + (remaining_blocks, uploaded) + }; + uploaded.await?; + + // no need to synchronize with gc iteration again + + if remaining_blocks > 0 { + tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked"); + } else { + tracing::info!("gc is now unblocked for the tenant"); + } + + Ok(()) + } + + pub(crate) fn before_delete(&self, timeline: &super::Timeline) { + let unblocked = { + let mut g = self.reasons.lock().unwrap(); + if g.is_empty() { + return; + } + + g.remove(&timeline.timeline_id); + + BlockingReasons::clean_and_summarize(g).is_none() + }; + + if unblocked { + tracing::info!("gc is now unblocked following deletion"); + } + } + + /// Initialize with the non-deleted timelines of this tenant. + pub(crate) fn set_scanned(&self, scanned: Storage) { + let mut g = self.reasons.lock().unwrap(); + assert!(g.is_empty()); + g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); + + if let Some(reasons) = BlockingReasons::clean_and_summarize(g) { + tracing::info!(summary=?reasons, "initialized with gc blocked"); + } + } +} + +pub(super) struct Guard<'a> { + _inner: tokio::sync::MutexGuard<'a, ()>, +} + +#[derive(Debug)] +pub(crate) struct BlockingReasons { + timelines: usize, + reasons: enumset::EnumSet, +} + +impl std::fmt::Display for BlockingReasons { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} timelines block for {:?}", + self.timelines, self.reasons + ) + } +} + +impl BlockingReasons { + fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option { + let mut reasons = enumset::EnumSet::empty(); + g.retain(|_key, value| { + reasons = reasons.union(*value); + !value.is_empty() + }); + if !g.is_empty() { + Some(BlockingReasons { + timelines: g.len(), + reasons, + }) + } else { + None + } + } + + fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option { + if g.is_empty() { + None + } else { + let reasons = g + .values() + .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next)); + Some(BlockingReasons { + timelines: g.len(), + reasons, + }) + } + } +} diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 9e021c7e35..1344fe4192 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -800,6 +800,123 @@ impl RemoteTimelineClient { .context("wait completion") } + /// Adds a gc blocking reason for this timeline if one does not exist already. + /// + /// A retryable step of timeline detach ancestor. + /// + /// Returns a future which waits until the completion of the upload. + pub(crate) fn schedule_insert_gc_block_reason( + self: &Arc, + reason: index::GcBlockingReason, + ) -> Result>, NotInitialized> + { + let maybe_barrier = { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + + if let index::GcBlockingReason::DetachAncestor = reason { + if upload_queue.dirty.metadata.ancestor_timeline().is_none() { + drop(guard); + panic!("cannot start detach ancestor if there is nothing to detach from"); + } + } + + let wanted = |x: Option<&index::GcBlocking>| x.is_some_and(|x| x.blocked_by(reason)); + + let current = upload_queue.dirty.gc_blocking.as_ref(); + let uploaded = upload_queue.clean.0.gc_blocking.as_ref(); + + match (current, uploaded) { + (x, y) if wanted(x) && wanted(y) => None, + (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)), + // Usual case: !wanted(x) && !wanted(y) + // + // Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to + // turn on and off some reason. + (x, y) => { + if !wanted(x) && wanted(y) { + // this could be avoided by having external in-memory synchronization, like + // timeline detach ancestor + warn!(?reason, op="insert", "unexpected: two racing processes to enable and disable a gc blocking reason"); + } + + // at this point, the metadata must always show that there is a parent + upload_queue.dirty.gc_blocking = current + .map(|x| x.with_reason(reason)) + .or_else(|| Some(index::GcBlocking::started_now_for(reason))); + self.schedule_index_upload(upload_queue)?; + Some(self.schedule_barrier0(upload_queue)) + } + } + }; + + Ok(async move { + if let Some(barrier) = maybe_barrier { + Self::wait_completion0(barrier).await?; + } + Ok(()) + }) + } + + /// Removes a gc blocking reason for this timeline if one exists. + /// + /// A retryable step of timeline detach ancestor. + /// + /// Returns a future which waits until the completion of the upload. + pub(crate) fn schedule_remove_gc_block_reason( + self: &Arc, + reason: index::GcBlockingReason, + ) -> Result>, NotInitialized> + { + let maybe_barrier = { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = guard.initialized_mut()?; + + if let index::GcBlockingReason::DetachAncestor = reason { + if !upload_queue + .clean + .0 + .lineage + .is_detached_from_original_ancestor() + { + drop(guard); + panic!("cannot complete timeline_ancestor_detach while not detached"); + } + } + + let wanted = |x: Option<&index::GcBlocking>| { + x.is_none() || x.is_some_and(|b| !b.blocked_by(reason)) + }; + + let current = upload_queue.dirty.gc_blocking.as_ref(); + let uploaded = upload_queue.clean.0.gc_blocking.as_ref(); + + match (current, uploaded) { + (x, y) if wanted(x) && wanted(y) => None, + (x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)), + (x, y) => { + if !wanted(x) && wanted(y) { + warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)"); + } + + upload_queue.dirty.gc_blocking = + current.as_ref().and_then(|x| x.without_reason(reason)); + assert!(wanted(upload_queue.dirty.gc_blocking.as_ref())); + // FIXME: bogus ? + self.schedule_index_upload(upload_queue)?; + Some(self.schedule_barrier0(upload_queue)) + } + } + }; + + Ok(async move { + if let Some(barrier) = maybe_barrier { + Self::wait_completion0(barrier).await?; + } + Ok(()) + }) + } + /// Launch an upload operation in the background; the file is added to be included in next /// `index_part.json` upload. pub(crate) fn schedule_layer_file_upload( diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 3075df022e..8e6290030d 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -60,6 +60,9 @@ pub struct IndexPart { #[serde(default)] pub(crate) lineage: Lineage, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub(crate) gc_blocking: Option, + /// Describes the kind of aux files stored in the timeline. /// /// The value is modified during file ingestion when the latest wanted value communicated via tenant config is applied if it is acceptable. @@ -101,6 +104,7 @@ impl IndexPart { deleted_at: None, archived_at: None, lineage: Default::default(), + gc_blocking: None, last_aux_file_policy: None, } } @@ -251,6 +255,64 @@ impl Lineage { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct GcBlocking { + pub(crate) started_at: NaiveDateTime, + pub(crate) reasons: enumset::EnumSet, +} + +#[derive(Debug, enumset::EnumSetType, serde::Serialize, serde::Deserialize)] +#[enumset(serialize_repr = "list")] +pub(crate) enum GcBlockingReason { + Manual, + DetachAncestor, +} + +impl GcBlocking { + pub(super) fn started_now_for(reason: GcBlockingReason) -> Self { + GcBlocking { + started_at: chrono::Utc::now().naive_utc(), + reasons: enumset::EnumSet::only(reason), + } + } + + /// Returns true if the given reason is one of the reasons why the gc is blocked. + pub(crate) fn blocked_by(&self, reason: GcBlockingReason) -> bool { + self.reasons.contains(reason) + } + + /// Returns a version of self with the given reason. + pub(super) fn with_reason(&self, reason: GcBlockingReason) -> Self { + assert!(!self.blocked_by(reason)); + let mut reasons = self.reasons; + reasons.insert(reason); + + Self { + started_at: self.started_at, + reasons, + } + } + + /// Returns a version of self without the given reason. Assumption is that if + /// there are no more reasons, we can unblock the gc by returning `None`. + pub(super) fn without_reason(&self, reason: GcBlockingReason) -> Option { + assert!(self.blocked_by(reason)); + + if self.reasons.len() == 1 { + None + } else { + let mut reasons = self.reasons; + assert!(reasons.remove(reason)); + assert!(!reasons.is_empty()); + + Some(Self { + started_at: self.started_at, + reasons, + }) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -292,6 +354,7 @@ mod tests { deleted_at: None, archived_at: None, lineage: Lineage::default(), + gc_blocking: None, last_aux_file_policy: None, }; @@ -335,6 +398,7 @@ mod tests { deleted_at: None, archived_at: None, lineage: Lineage::default(), + gc_blocking: None, last_aux_file_policy: None, }; @@ -379,6 +443,7 @@ mod tests { deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), archived_at: None, lineage: Lineage::default(), + gc_blocking: None, last_aux_file_policy: None, }; @@ -426,6 +491,7 @@ mod tests { deleted_at: None, archived_at: None, lineage: Lineage::default(), + gc_blocking: None, last_aux_file_policy: None, }; @@ -468,6 +534,7 @@ mod tests { deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), archived_at: None, lineage: Lineage::default(), + gc_blocking: None, last_aux_file_policy: None, }; @@ -513,6 +580,7 @@ mod tests { reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()], original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))), }, + gc_blocking: None, last_aux_file_policy: None, }; @@ -563,6 +631,7 @@ mod tests { reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()], original_ancestor: Some((TimelineId::from_str("e2bfd8c633d713d279e6fcd2bcc15b6d").unwrap(), Lsn::from_str("0/15A7618").unwrap(), parse_naive_datetime("2024-05-07T18:52:36.322426563"))), }, + gc_blocking: None, last_aux_file_policy: Some(AuxFilePolicy::V2), }; @@ -618,6 +687,7 @@ mod tests { deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), archived_at: None, lineage: Default::default(), + gc_blocking: None, last_aux_file_policy: Default::default(), }; @@ -674,6 +744,7 @@ mod tests { deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")), archived_at: Some(parse_naive_datetime("2023-04-29T09:00:00.123000000")), lineage: Default::default(), + gc_blocking: None, last_aux_file_policy: Default::default(), }; @@ -681,6 +752,68 @@ mod tests { assert_eq!(part, expected); } + #[test] + fn v9_indexpart_is_parsed() { + let example = r#"{ + "version": 9, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata": { + "disk_consistent_lsn": "0/16960E8", + "prev_record_lsn": "0/1696070", + "ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e", + "ancestor_lsn": "0/0", + "latest_gc_cutoff_lsn": "0/1696070", + "initdb_lsn": "0/1696070", + "pg_version": 14 + }, + "gc_blocking": { + "started_at": "2024-07-19T09:00:00.123", + "reasons": ["DetachAncestor"] + } + }"#; + + let expected = IndexPart { + version: 9, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata { + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::new( + Lsn::from_str("0/16960E8").unwrap(), + Some(Lsn::from_str("0/1696070").unwrap()), + Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()), + Lsn::INVALID, + Lsn::from_str("0/1696070").unwrap(), + Lsn::from_str("0/1696070").unwrap(), + 14, + ).with_recalculated_checksum().unwrap(), + deleted_at: None, + lineage: Default::default(), + gc_blocking: Some(GcBlocking { + started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"), + reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]), + }), + last_aux_file_policy: Default::default(), + archived_at: None, + }; + + let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + fn parse_naive_datetime(s: &str) -> NaiveDateTime { chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f").unwrap() } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 05bf4eac8b..79bfd1ebb2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5698,6 +5698,22 @@ impl Timeline { } } + /// Persistently blocks gc for `Manual` reason. + /// + /// Returns true if no such block existed before, false otherwise. + pub(crate) async fn block_gc(&self, tenant: &super::Tenant) -> anyhow::Result { + use crate::tenant::remote_timeline_client::index::GcBlockingReason; + assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id); + tenant.gc_block.insert(self, GcBlockingReason::Manual).await + } + + /// Persistently unblocks gc for `Manual` reason. + pub(crate) async fn unblock_gc(&self, tenant: &super::Tenant) -> anyhow::Result<()> { + use crate::tenant::remote_timeline_client::index::GcBlockingReason; + assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id); + tenant.gc_block.remove(self, GcBlockingReason::Manual).await + } + #[cfg(test)] pub(super) fn force_advance_lsn(self: &Arc, new_lsn: Lsn) { self.last_record_lsn.advance(new_lsn); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 05178c38b4..b03dbb092e 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -230,6 +230,8 @@ impl DeleteTimelineFlow { // Now that the Timeline is in Stopping state, request all the related tasks to shut down. timeline.shutdown(super::ShutdownMode::Hard).await; + tenant.gc_block.before_delete(&timeline); + fail::fail_point!("timeline-delete-before-index-deleted-at", |_| { Err(anyhow::anyhow!( "failpoint: timeline-delete-before-index-deleted-at" diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 192324f086..61e2204b23 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -556,6 +556,22 @@ class PageserverHttpClient(requests.Session, MetricsGetter): assert isinstance(res_json, dict) return res_json + def timeline_block_gc(self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/block_gc", + ) + log.info(f"Got GC request response code: {res.status_code}") + self.verbose_error(res) + + def timeline_unblock_gc( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/unblock_gc", + ) + log.info(f"Got GC request response code: {res.status_code}") + self.verbose_error(res) + def timeline_compact( self, tenant_id: Union[TenantId, TenantShardId], diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py new file mode 100644 index 0000000000..24de894687 --- /dev/null +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -0,0 +1,67 @@ +import time + +from fixtures.neon_fixtures import ( + NeonEnvBuilder, +) +from fixtures.pageserver.utils import wait_timeline_detail_404 + + +def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start( + initial_tenant_conf={"gc_period": "1s", "lsn_lease_length": "0s"} + ) + ps = env.pageserver + http = ps.http_client() + + foo_branch = env.neon_cli.create_branch("foo", "main", env.initial_tenant) + + gc_active_line = ".* gc_loop.*: [12] timelines need GC" + gc_skipped_line = ".* gc_loop.*: Skipping GC: .*" + init_gc_skipped = ".*: initialized with gc blocked.*" + + tenant_before = http.tenant_status(env.initial_tenant) + + wait_for_another_gc_round() + _, offset = ps.assert_log_contains(gc_active_line) + + assert ps.log_contains(gc_skipped_line, offset) is None + + http.timeline_block_gc(env.initial_tenant, foo_branch) + + tenant_after = http.tenant_status(env.initial_tenant) + assert tenant_before != tenant_after + gc_blocking = tenant_after["gc_blocking"] + assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }" + + wait_for_another_gc_round() + _, offset = ps.assert_log_contains(gc_skipped_line, offset) + + ps.restart() + ps.quiesce_tenants() + + _, offset = env.pageserver.assert_log_contains(init_gc_skipped, offset) + + wait_for_another_gc_round() + _, offset = ps.assert_log_contains(gc_skipped_line, offset) + + # deletion unblocks gc + http.timeline_delete(env.initial_tenant, foo_branch) + wait_timeline_detail_404(http, env.initial_tenant, foo_branch, 10, 1.0) + + wait_for_another_gc_round() + _, offset = ps.assert_log_contains(gc_active_line, offset) + + http.timeline_block_gc(env.initial_tenant, env.initial_timeline) + + wait_for_another_gc_round() + _, offset = ps.assert_log_contains(gc_skipped_line, offset) + + # removing the manual block also unblocks gc + http.timeline_unblock_gc(env.initial_tenant, env.initial_timeline) + + wait_for_another_gc_round() + _, offset = ps.assert_log_contains(gc_active_line, offset) + + +def wait_for_another_gc_round(): + time.sleep(2)