diff --git a/pageserver/src/deletion_queue/validator.rs b/pageserver/src/deletion_queue/validator.rs index 72bdbdefd6..bf06c78e67 100644 --- a/pageserver/src/deletion_queue/validator.rs +++ b/pageserver/src/deletion_queue/validator.rs @@ -178,7 +178,14 @@ where .unwrap_or(false); if valid && *validated_generation == tenant_lsn_state.generation { - for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines { + for (timeline_id, pending_lsn) in tenant_lsn_state.timelines { + tracing::debug!( + %tenant_id, + %timeline_id, + current = %pending_lsn.result_slot.load(), + projected = %pending_lsn.projected, + "advancing validated remote_consistent_lsn", + ); pending_lsn.result_slot.store(pending_lsn.projected); } } else { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 24f47df92e..97d731bf49 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -138,6 +138,14 @@ pub struct GcResult { #[serde(serialize_with = "serialize_duration_as_millis")] pub elapsed: Duration, + + /// The layers which were garbage collected. + /// + /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be + /// dropped in tests. + #[cfg(feature = "testing")] + #[serde(skip)] + pub(crate) doomed_layers: Vec, } // helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds @@ -158,5 +166,11 @@ impl AddAssign for GcResult { self.layers_removed += other.layers_removed; self.elapsed += other.elapsed; + + #[cfg(feature = "testing")] + { + let mut other = other; + self.doomed_layers.append(&mut other.doomed_layers); + } } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0260905bd2..d75f61d8b3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2638,14 +2638,12 @@ impl Tenant { // Perform GC for each timeline. // - // Note that we don't hold the GC lock here because we don't want - // to delay the branch creation task, which requires the GC lock. - // A timeline GC iteration can be slow because it may need to wait for - // compaction (both require `layer_removal_cs` lock), - // but the GC iteration can run concurrently with branch creation. + // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the + // branch creation task, which requires the GC lock. A GC iteration can run concurrently + // with branch creation. // - // See comments in [`Tenant::branch_timeline`] for more information - // about why branch creation task can run concurrently with timeline's GC iteration. + // See comments in [`Tenant::branch_timeline`] for more information about why branch + // creation task can run concurrently with timeline's GC iteration. for timeline in gc_timelines { if task_mgr::is_shutdown_requested() || cancel.is_cancelled() { // We were requested to shut down. Stop and return with the progress we diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 3ff7425bc2..5988dcd45f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -792,8 +792,6 @@ pub(crate) async fn set_new_tenant_config( impl TenantManager { /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query. /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. - /// - /// This method is cancel-safe. pub(crate) fn get_attached_tenant_shard( &self, tenant_shard_id: TenantShardId, @@ -1961,6 +1959,7 @@ pub(crate) async fn immediate_gc( // Run in task_mgr to avoid race with tenant_detach operation let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let (task_done, wait_task_done) = tokio::sync::oneshot::channel(); + // TODO: spawning is redundant now, need to hold the gate task_mgr::spawn( &tokio::runtime::Handle::current(), TaskKind::GarbageCollector, @@ -1970,12 +1969,40 @@ pub(crate) async fn immediate_gc( false, async move { fail::fail_point!("immediate_gc_task_pre"); - let result = tenant + + #[allow(unused_mut)] + let mut result = tenant .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx) .instrument(info_span!("manual_gc", %tenant_id, %timeline_id)) .await; // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it // better once the types support it. + + #[cfg(feature = "testing")] + { + if let Ok(result) = result.as_mut() { + // why not futures unordered? it seems it needs very much the same task structure + // but would only run on single task. + let mut js = tokio::task::JoinSet::new(); + for layer in std::mem::take(&mut result.doomed_layers) { + js.spawn(layer.wait_drop()); + } + tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped"); + while let Some(res) = js.join_next().await { + res.expect("wait_drop should not panic"); + } + } + + let timeline = tenant.get_timeline(timeline_id, false).ok(); + let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref()); + + if let Some(rtc) = rtc { + // layer drops schedule actions on remote timeline client to actually do the + // deletions; don't care just exit fast about the shutdown error + drop(rtc.wait_completion().await); + } + } + match task_done.send(result) { Ok(_) => (), Err(result) => error!("failed to send gc result: {result:?}"), diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 962a0fa795..01c60ca8f8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -827,10 +827,8 @@ impl RemoteTimelineClient { Ok(()) } - /// /// Wait for all previously scheduled uploads/deletions to complete - /// - pub async fn wait_completion(self: &Arc) -> anyhow::Result<()> { + pub(crate) async fn wait_completion(self: &Arc) -> anyhow::Result<()> { let mut receiver = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; @@ -840,6 +838,7 @@ impl RemoteTimelineClient { if receiver.changed().await.is_err() { anyhow::bail!("wait_completion aborted because upload queue was stopped"); } + Ok(()) } @@ -866,6 +865,56 @@ impl RemoteTimelineClient { receiver } + /// Wait for all previously scheduled operations to complete, and then stop. + /// + /// Not cancellation safe + pub(crate) async fn shutdown(self: &Arc) -> Result<(), StopError> { + // On cancellation the queue is left in ackward state of refusing new operations but + // proper stop is yet to be called. On cancel the original or some later task must call + // `stop` or `shutdown`. + let sg = scopeguard::guard((), |_| { + tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error") + }); + + let fut = { + let mut guard = self.upload_queue.lock().unwrap(); + let upload_queue = match &mut *guard { + UploadQueue::Stopped(_) => return Ok(()), + UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized), + UploadQueue::Initialized(ref mut init) => init, + }; + + // if the queue is already stuck due to a shutdown operation which was cancelled, then + // just don't add more of these as they would never complete. + // + // TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue + // in every place we would not have to jump through this hoop, and this method could be + // made cancellable. + if !upload_queue.shutting_down { + upload_queue.shutting_down = true; + upload_queue.queued_operations.push_back(UploadOp::Shutdown); + // this operation is not counted similar to Barrier + + self.launch_queued_tasks(upload_queue); + } + + upload_queue.shutdown_ready.clone().acquire_owned() + }; + + let res = fut.await; + + scopeguard::ScopeGuard::into_inner(sg); + + match res { + Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"), + Err(_closed) => { + // expected + } + } + + self.stop() + } + /// Set the deleted_at field in the remote index file. /// /// This fails if the upload queue has not been `stop()`ed. @@ -1103,7 +1152,9 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len() } - UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(), + UploadOp::Barrier(_) | UploadOp::Shutdown => { + upload_queue.inprogress_tasks.is_empty() + } }; // If we cannot launch this task, don't look any further. @@ -1116,6 +1167,13 @@ impl RemoteTimelineClient { break; } + if let UploadOp::Shutdown = next_op { + // leave the op in the queue but do not start more tasks; it will be dropped when + // the stop is called. + upload_queue.shutdown_ready.close(); + break; + } + // We can launch this task. Remove it from the queue first. let next_op = upload_queue.queued_operations.pop_front().unwrap(); @@ -1136,6 +1194,7 @@ impl RemoteTimelineClient { sender.send_replace(()); continue; } + UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"), }; // Assign unique ID to this task @@ -1274,10 +1333,10 @@ impl RemoteTimelineClient { .await .map_err(|e| anyhow::anyhow!(e)) } - UploadOp::Barrier(_) => { + unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => { // unreachable. Barrier operations are handled synchronously in // launch_queued_tasks - warn!("unexpected Barrier operation in perform_upload_task"); + warn!("unexpected {unexpected:?} operation in perform_upload_task"); break; } }; @@ -1371,7 +1430,7 @@ impl RemoteTimelineClient { upload_queue.num_inprogress_deletions -= 1; None } - UploadOp::Barrier(_) => unreachable!(), + UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(), }; // Launch any queued tasks that were unblocked by this one. @@ -1426,7 +1485,7 @@ impl RemoteTimelineClient { reason: "should we track deletes? positive or negative sign?", }, ), - UploadOp::Barrier(_) => { + UploadOp::Barrier(..) | UploadOp::Shutdown => { // we do not account these return None; } @@ -1452,10 +1511,13 @@ impl RemoteTimelineClient { } /// Close the upload queue for new operations and cancel queued operations. + /// + /// Use [`RemoteTimelineClient::shutdown`] for graceful stop. + /// /// In-progress operations will still be running after this function returns. /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))` /// to wait for them to complete, after calling this function. - pub fn stop(&self) -> Result<(), StopError> { + pub(crate) fn stop(&self) -> Result<(), StopError> { // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. @@ -1493,6 +1555,8 @@ impl RemoteTimelineClient { queued_operations: VecDeque::default(), #[cfg(feature = "testing")] dangling_files: HashMap::default(), + shutting_down: false, + shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; let upload_queue = std::mem::replace( diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 703fb7db24..c27c3e69ed 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -326,6 +326,24 @@ impl Layer { Ok(()) } + + /// Waits until this layer has been dropped (and if needed, local garbage collection and remote + /// deletion scheduling has completed). + /// + /// Does not start garbage collection, use [`Self::garbage_collect_on_drop`] for that + /// separatedly. + #[cfg(feature = "testing")] + pub(crate) fn wait_drop(&self) -> impl std::future::Future + 'static { + let mut rx = self.0.status.subscribe(); + + async move { + loop { + if let Err(tokio::sync::broadcast::error::RecvError::Closed) = rx.recv().await { + break; + } + } + } + } } /// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted. @@ -475,10 +493,14 @@ impl Drop for LayerInner { let file_size = self.layer_desc().file_size; let timeline = self.timeline.clone(); let meta = self.metadata(); + let status = self.status.clone(); crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || { let _g = span.entered(); + // carry this until we are finished for [`Layer::wait_drop`] support + let _status = status; + let removed = match std::fs::remove_file(path) { Ok(()) => true, Err(e) if e.kind() == std::io::ErrorKind::NotFound => { @@ -1416,6 +1438,7 @@ impl Default for LayerImplMetrics { ) .unwrap(); + // reminder: this will be pageserver_layer_gcs_count_total with "_total" suffix let gcs = metrics::register_int_counter_vec!( "pageserver_layer_gcs_count", "Garbage collections started and completed in the Layer implementation", diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9e204e1258..a7cf427de5 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -251,14 +251,6 @@ pub struct Timeline { /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>, - /// Layer removal lock. - /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. - /// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`]. - /// This is an `Arc` lock because we need an owned - /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`). - /// Note that [`DeleteTimelineFlow`] uses `delete_progress` field. - pub(super) layer_removal_cs: Arc>, - // Needed to ensure that we can't create a branch at a point that was already garbage collected pub latest_gc_cutoff_lsn: Rcu, @@ -319,6 +311,24 @@ pub struct Timeline { /// Cancellation token scoped to this timeline: anything doing long-running work relating /// to the timeline should drop out when this token fires. pub(crate) cancel: CancellationToken, + + /// Make sure we only have one running compaction at a time in tests. + /// + /// Must only be taken in two places: + /// - [`Timeline::compact`] (this file) + /// - [`delete::delete_local_layer_files`] + /// + /// Timeline deletion will acquire both compaction and gc locks in whatever order. + compaction_lock: tokio::sync::Mutex<()>, + + /// Make sure we only have one running gc at a time. + /// + /// Must only be taken in two places: + /// - [`Timeline::gc`] (this file) + /// - [`delete::delete_local_layer_files`] + /// + /// Timeline deletion will acquire both compaction and gc locks in whatever order. + gc_lock: tokio::sync::Mutex<()>, } pub struct WalReceiverInfo { @@ -704,6 +714,8 @@ impl Timeline { flags: EnumSet, ctx: &RequestContext, ) -> Result<(), CompactionError> { + let _g = self.compaction_lock.lock().await; + // this wait probably never needs any "long time spent" logging, because we already nag if // compaction task goes over it's period (20s) which is quite often in production. let _permit = match super::tasks::concurrent_background_tasks_rate_limit( @@ -758,7 +770,7 @@ impl Timeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); + // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); @@ -799,8 +811,7 @@ impl Timeline { // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); - self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx) - .await?; + self.compact_level0(target_file_size, ctx).await?; timer.stop_and_record(); if let Some(remote_client) = &self.remote_client { @@ -946,7 +957,7 @@ impl Timeline { // what is problematic is the shutting down of RemoteTimelineClient, because // obviously it does not make sense to stop while we wait for it, but what // about corner cases like s3 suddenly hanging up? - if let Err(e) = client.wait_completion().await { + if let Err(e) = client.shutdown().await { // Non-fatal. Shutdown is infallible. Failures to flush just mean that // we have some extra WAL replay to do next time the timeline starts. warn!("failed to flush to remote storage: {e:#}"); @@ -1201,16 +1212,6 @@ impl Timeline { remote_client: &Arc, layers_to_evict: &[Layer], ) -> anyhow::Result>>> { - // ensure that the layers have finished uploading - // (don't hold the layer_removal_cs while we do it, we're not removing anything yet) - remote_client - .wait_completion() - .await - .context("wait for layer upload ops to complete")?; - - // now lock out layer removal (compaction, gc, timeline deletion) - let _layer_removal_guard = self.layer_removal_cs.lock().await; - { // to avoid racing with detach and delete_timeline let state = self.current_state(); @@ -1421,7 +1422,6 @@ impl Timeline { layer_flush_done_tx, write_lock: tokio::sync::Mutex::new(()), - layer_removal_cs: Default::default(), gc_info: std::sync::RwLock::new(GcInfo { retain_lsns: Vec::new(), @@ -1460,6 +1460,9 @@ impl Timeline { initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), cancel, gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")), + + compaction_lock: tokio::sync::Mutex::default(), + gc_lock: tokio::sync::Mutex::default(), }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; @@ -3150,13 +3153,8 @@ impl TryFrom for CompactLevel0Phase1Stats { impl Timeline { /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment. - /// - /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are - /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the - /// start of level0 files compaction, the on-demand download should be revisited as well. async fn compact_level0_phase1( self: &Arc, - _layer_removal_cs: Arc>, guard: tokio::sync::OwnedRwLockReadGuard, mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, @@ -3243,8 +3241,6 @@ impl Timeline { let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end; let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len()); - // FIXME: downloading while holding layer_removal_cs is not great, but we will remove that - // soon deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; @@ -3594,7 +3590,6 @@ impl Timeline { /// async fn compact_level0( self: &Arc, - layer_removal_cs: Arc>, target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { @@ -3616,16 +3611,9 @@ impl Timeline { let now = tokio::time::Instant::now(); stats.read_lock_acquisition_micros = DurationRecorder::Recorded(RecordedDuration(now - begin), now); - let layer_removal_cs = layer_removal_cs.clone(); - self.compact_level0_phase1( - layer_removal_cs, - phase1_layers_locked, - stats, - target_file_size, - &ctx, - ) - .instrument(phase1_span) - .await? + self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx) + .instrument(phase1_span) + .await? }; if new_layers.is_empty() && deltas_to_compact.is_empty() { @@ -3633,17 +3621,6 @@ impl Timeline { return Ok(()); } - // Before deleting any layers, we need to wait for their upload ops to finish. - // See remote_timeline_client module level comment on consistency. - // Do it here because we don't want to hold self.layers.write() while waiting. - if let Some(remote_client) = &self.remote_client { - debug!("waiting for upload ops to complete"); - remote_client - .wait_completion() - .await - .context("wait for layer upload ops to complete")?; - } - let mut guard = self.layers.write().await; let mut duplicated_layers = HashSet::new(); @@ -3675,12 +3652,7 @@ impl Timeline { }; // deletion will happen later, the layer file manager calls garbage_collect_on_drop - guard.finish_compact_l0( - &layer_removal_cs, - &remove_layers, - &insert_layers, - &self.metrics, - ); + guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics); if let Some(remote_client) = self.remote_client.as_ref() { remote_client.schedule_compaction_update(&remove_layers, &new_layers)?; @@ -3791,19 +3763,17 @@ impl Timeline { Ok(()) } - /// /// Garbage collect layer files on a timeline that are no longer needed. /// /// Currently, we don't make any attempt at removing unneeded page versions /// within a layer file. We can only remove the whole file if it's fully /// obsolete. - /// pub(super) async fn gc(&self) -> anyhow::Result { + let _g = self.gc_lock.lock().await; let timer = self.metrics.garbage_collect_histo.start_timer(); fail_point!("before-timeline-gc"); - let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); // Is the timeline being deleted? if self.is_stopping() { anyhow::bail!("timeline is Stopping"); @@ -3821,13 +3791,7 @@ impl Timeline { let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); let res = self - .gc_timeline( - layer_removal_cs.clone(), - horizon_cutoff, - pitr_cutoff, - retain_lsns, - new_gc_cutoff, - ) + .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff) .instrument( info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff), ) @@ -3841,7 +3805,6 @@ impl Timeline { async fn gc_timeline( &self, - layer_removal_cs: Arc>, horizon_cutoff: Lsn, pitr_cutoff: Lsn, retain_lsns: Vec, @@ -3879,17 +3842,6 @@ impl Timeline { debug!("retain_lsns: {:?}", retain_lsns); - // Before deleting any layers, we need to wait for their upload ops to finish. - // See storage_sync module level comment on consistency. - // Do it here because we don't want to hold self.layers.write() while waiting. - if let Some(remote_client) = &self.remote_client { - debug!("waiting for upload ops to complete"); - remote_client - .wait_completion() - .await - .context("wait for layer upload ops to complete")?; - } - let mut layers_to_remove = Vec::new(); let mut wanted_image_layers = KeySpaceRandomAccum::default(); @@ -4005,6 +3957,11 @@ impl Timeline { // // This does not in fact have any effect as we no longer consider local metadata unless // running without remote storage. + // + // This unconditionally schedules also an index_part.json update, even though, we will + // be doing one a bit later with the unlinked gc'd layers. + // + // TODO: remove when implementing . self.update_metadata_file(self.disk_consistent_lsn.load(), None) .await?; @@ -4019,11 +3976,16 @@ impl Timeline { remote_client.schedule_gc_update(&gc_layers)?; } - guard.finish_gc_timeline(&layer_removal_cs, gc_layers); + guard.finish_gc_timeline(&gc_layers); if result.layers_removed != 0 { fail_point!("after-timeline-gc-removed-layers"); } + + #[cfg(feature = "testing")] + { + result.doomed_layers = gc_layers; + } } info!( @@ -4035,9 +3997,7 @@ impl Timeline { Ok(result) } - /// /// Reconstruct a value, using the given base image and WAL records in 'data'. - /// async fn reconstruct_value( &self, key: Key, diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 429b3347eb..fefeafb7d3 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -110,11 +110,11 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi Ok(()) } -/// Grab the layer_removal_cs lock, and actually perform the deletion. +/// Grab the compaction and gc locks, and actually perform the deletion. /// -/// This lock prevents prevents GC or compaction from running at the same time. -/// The GC task doesn't register itself with the timeline it's operating on, -/// so it might still be running even though we called `shutdown_tasks`. +/// The locks prevent GC or compaction from running at the same time. The background tasks do not +/// register themselves with the timeline it's operating on, so it might still be running even +/// though we called `shutdown_tasks`. /// /// Note that there are still other race conditions between /// GC, compaction and timeline deletion. See @@ -122,14 +122,19 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi /// /// No timeout here, GC & Compaction should be responsive to the /// `TimelineState::Stopping` change. -async fn delete_local_layer_files( +// pub(super): documentation link +pub(super) async fn delete_local_layer_files( conf: &PageServerConf, tenant_id: TenantId, timeline: &Timeline, ) -> anyhow::Result<()> { - info!("waiting for layer_removal_cs.lock()"); - let layer_removal_guard = timeline.layer_removal_cs.lock().await; - info!("got layer_removal_cs.lock(), deleting layer files"); + let guards = async { tokio::join!(timeline.gc_lock.lock(), timeline.compaction_lock.lock()) }; + let guards = crate::timed( + guards, + "acquire gc and compaction locks", + std::time::Duration::from_secs(5), + ) + .await; // NB: storage_sync upload tasks that reference these layers have been cancelled // by the caller. @@ -150,8 +155,8 @@ async fn delete_local_layer_files( // because of a previous failure/cancellation at/after // failpoint timeline-delete-after-rm. // - // It can also happen if we race with tenant detach, because, - // it doesn't grab the layer_removal_cs lock. + // ErrorKind::NotFound can also happen if we race with tenant detach, because, + // no locks are shared. // // For now, log and continue. // warn! level is technically not appropriate for the @@ -219,8 +224,8 @@ async fn delete_local_layer_files( .with_context(|| format!("Failed to remove: {}", entry.path().display()))?; } - info!("finished deleting layer files, releasing layer_removal_cs.lock()"); - drop(layer_removal_guard); + info!("finished deleting layer files, releasing locks"); + drop(guards); fail::fail_point!("timeline-delete-after-rm", |_| { Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))? diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index f4a4c26c06..2d0f1c609b 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -296,7 +296,6 @@ impl Timeline { stats.evicted += 1; } Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => { - // compaction/gc removed the file while we were waiting on layer_removal_cs stats.not_evictable += 1; } } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index e4991e0865..7e1aa279d3 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -190,7 +190,6 @@ impl LayerManager { /// Called when compaction is completed. pub(crate) fn finish_compact_l0( &mut self, - layer_removal_cs: &Arc>, compact_from: &[Layer], compact_to: &[ResidentLayer], metrics: &TimelineMetrics, @@ -201,25 +200,16 @@ impl LayerManager { metrics.record_new_file_metrics(l.layer_desc().file_size); } for l in compact_from { - Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr); + Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr); } updates.flush(); } - /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map. - pub(crate) fn finish_gc_timeline( - &mut self, - layer_removal_cs: &Arc>, - gc_layers: Vec, - ) { + /// Called when garbage collect has selected the layers to be removed. + pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) { let mut updates = self.layer_map.batch_update(); for doomed_layer in gc_layers { - Self::delete_historic_layer( - layer_removal_cs, - &doomed_layer, - &mut updates, - &mut self.layer_fmgr, - ); + Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr); } updates.flush() } @@ -238,7 +228,6 @@ impl LayerManager { /// Remote storage is not affected by this operation. fn delete_historic_layer( // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: &Arc>, layer: &Layer, updates: &mut BatchedUpdates<'_>, mapping: &mut LayerFileManager, diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 5eaed1d5ca..32f14f40c5 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -90,6 +90,14 @@ pub(crate) struct UploadQueueInitialized { /// bug causing leaks, then it's better to not leave this enabled for production builds. #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, + + /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`. + pub(crate) shutting_down: bool, + + /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can + /// wait on until one of them stops the queue. The semaphore is closed when + /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`. + pub(crate) shutdown_ready: Arc, } impl UploadQueueInitialized { @@ -148,6 +156,8 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + shutting_down: false, + shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; *self = UploadQueue::Initialized(state); @@ -195,6 +205,8 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + shutting_down: false, + shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; *self = UploadQueue::Initialized(state); @@ -206,7 +218,13 @@ impl UploadQueue { UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { anyhow::bail!("queue is in state {}", self.as_str()) } - UploadQueue::Initialized(x) => Ok(x), + UploadQueue::Initialized(x) => { + if !x.shutting_down { + Ok(x) + } else { + anyhow::bail!("queue is shutting down") + } + } } } @@ -250,6 +268,10 @@ pub(crate) enum UploadOp { /// Barrier. When the barrier operation is reached, Barrier(tokio::sync::watch::Sender<()>), + + /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise + /// this is the same as a Barrier. + Shutdown, } impl std::fmt::Display for UploadOp { @@ -271,6 +293,7 @@ impl std::fmt::Display for UploadOp { write!(f, "Delete({} layers)", delete.layers.len()) } UploadOp::Barrier(_) => write!(f, "Barrier"), + UploadOp::Shutdown => write!(f, "Shutdown"), } } } diff --git a/s3_scrubber/src/checks.rs b/s3_scrubber/src/checks.rs index 64702fca3d..0686225d1b 100644 --- a/s3_scrubber/src/checks.rs +++ b/s3_scrubber/src/checks.rs @@ -94,11 +94,10 @@ pub(crate) async fn branch_cleanup_and_check_errors( != index_part.get_disk_consistent_lsn() { result.errors.push(format!( - "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})", - index_part.metadata.disk_consistent_lsn(), - index_part.get_disk_consistent_lsn(), - - )) + "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})", + index_part.metadata.disk_consistent_lsn(), + index_part.get_disk_consistent_lsn(), + )) } if index_part.layer_metadata.is_empty() { @@ -109,8 +108,8 @@ pub(crate) async fn branch_cleanup_and_check_errors( for (layer, metadata) in index_part.layer_metadata { if metadata.file_size == 0 { result.errors.push(format!( - "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(), - )) + "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(), + )) } let layer_map_key = (layer, metadata.generation); @@ -136,7 +135,7 @@ pub(crate) async fn branch_cleanup_and_check_errors( // a new generation that didn't upload an index yet. // // Even so, a layer that is not referenced by the index could just - // be something enqueued for deletion, so while this check is valid + // be something enqueued for deletion, so while this check is valid // for indicating that a layer is garbage, it is not an indicator // of a problem. gen < &index_part_generation) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 2f1d68b92c..76edf45496 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -4,7 +4,7 @@ import json import time from collections import defaultdict from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple import requests from requests.adapters import HTTPAdapter @@ -100,6 +100,15 @@ class LayerMapInfo: counts[hist_layer.kind] += 1 return counts + def delta_layers(self) -> List[HistoricLayerInfo]: + return [x for x in self.historic_layers if x.kind == "Delta"] + + def image_layers(self) -> List[HistoricLayerInfo]: + return [x for x in self.historic_layers if x.kind == "Image"] + + def historic_by_name(self) -> Set[str]: + return set(x.layer_file_name for x in self.historic_layers) + @dataclass class TenantConfig: @@ -416,6 +425,10 @@ class PageserverHttpClient(requests.Session): def timeline_gc( self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int] ) -> dict[str, Any]: + """ + Unlike most handlers, this will wait for the layers to be actually + complete registering themselves to the deletion queue. + """ self.is_testing_enabled_or_skip() log.info( diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 31bc97703e..4da8dd957d 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -763,9 +763,7 @@ def test_compaction_waits_for_upload( neon_env_builder: NeonEnvBuilder, ): """ - Compaction waits for outstanding uploads to complete, so that it avoids deleting layers - files that have not yet been uploaded. This test forces a race between upload and - compaction. + This test forces a race between upload and compaction. """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) @@ -784,6 +782,16 @@ def test_compaction_waits_for_upload( timeline_id = env.initial_timeline client = env.pageserver.http_client() + layers_at_creation = client.layer_map_info(tenant_id, timeline_id) + deltas_at_creation = len(layers_at_creation.delta_layers()) + assert ( + deltas_at_creation == 1 + ), "are you fixing #5863? make sure we end up with 2 deltas at the end of endpoint lifecycle" + + # Make new layer uploads get stuck. + # Note that timeline creation waits for the initial layers to reach remote storage. + # So at this point, the `layers_at_creation` are in remote storage. + client.configure_failpoints(("before-upload-layer-pausable", "pause")) with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: # Build two tables with some data inside @@ -791,85 +799,71 @@ def test_compaction_waits_for_upload( wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) client.timeline_checkpoint(tenant_id, timeline_id) + deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers()) + assert ( + deltas_at_first == 2 + ), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement." endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)") - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - - # Now make the flushing hang and update one small piece of data - client.configure_failpoints(("before-upload-layer-pausable", "pause")) - endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1") - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() - compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue() - compact_barrier = threading.Barrier(2) + layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name() + upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name() - def checkpoint_in_background(): - try: - log.info("Checkpoint starting") - client.timeline_checkpoint(tenant_id, timeline_id) - log.info("Checkpoint complete") - checkpoint_result.put(None) - except PageserverApiException as e: - log.info("Checkpoint errored: {e}") - checkpoint_result.put(e) + assert len(upload_stuck_layers) > 0 - def compact_in_background(): - compact_barrier.wait() - try: - log.info("Compaction starting") - client.timeline_compact(tenant_id, timeline_id) - log.info("Compaction complete") - compact_result.put(None) - except PageserverApiException as e: - log.info("Compaction errored: {e}") - compact_result.put(e) + for name in upload_stuck_layers: + path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name + assert path.exists(), "while uploads are stuck the layers should be present on disk" - checkpoint_thread = threading.Thread(target=checkpoint_in_background) - checkpoint_thread.start() + # now this will do the L0 => L1 compaction and want to remove + # upload_stuck_layers and the original initdb L0 + client.timeline_checkpoint(tenant_id, timeline_id) - compact_thread = threading.Thread(target=compact_in_background) - compact_thread.start() + # as uploads are paused, the the upload_stuck_layers should still be with us + for name in upload_stuck_layers: + path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name + assert path.exists(), "uploads are stuck still over compaction" - try: - # Start the checkpoint, see that it blocks - log.info("Waiting to see checkpoint hang...") - time.sleep(5) - assert checkpoint_result.empty() + compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name() + overlap = compacted_layers.intersection(upload_stuck_layers) + assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction" + assert ( + len(compacted_layers) == 1 + ), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)" - # Start the compaction, see that it finds work to do but blocks - compact_barrier.wait() - log.info("Waiting to see compaction hang...") - time.sleep(5) - assert compact_result.empty() + def layer_deletes_completed(): + m = client.get_metric_value("pageserver_layer_gcs_count_total", {"state": "completed"}) + if m is None: + return 0 + return int(m) - # This is logged once compaction is started, but before we wait for operations to complete - assert env.pageserver.log_contains("compact_level0_phase1 stats available.") + # if initdb created an initial delta layer, it might already be gc'd + # because it was uploaded before the failpoint was enabled. however, the + # deletion is not guaranteed to be complete. + assert layer_deletes_completed() <= 1 - # Once we unblock uploads the compaction should complete successfully - log.info("Disabling failpoint") - client.configure_failpoints(("before-upload-layer-pausable", "off")) - log.info("Awaiting compaction result") - assert compact_result.get(timeout=10) is None - log.info("Awaiting checkpoint result") - assert checkpoint_result.get(timeout=10) is None - - except Exception: - # Log the actual failure's backtrace here, before we proceed to join threads - log.exception("Failure, cleaning up...") - raise - finally: - compact_barrier.abort() - - checkpoint_thread.join() - compact_thread.join() + client.configure_failpoints(("before-upload-layer-pausable", "off")) # Ensure that this actually terminates wait_upload_queue_empty(client, tenant_id, timeline_id) - # We should not have hit the error handling path in uploads where the remote file is gone + def until_layer_deletes_completed(): + deletes = layer_deletes_completed() + log.info(f"layer_deletes: {deletes}") + # ensure that initdb delta layer AND the previously stuck are now deleted + assert deletes >= len(upload_stuck_layers) + 1 + + wait_until(10, 1, until_layer_deletes_completed) + + for name in upload_stuck_layers: + path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name + assert ( + not path.exists() + ), "l0 should now be removed because of L0 => L1 compaction and completed uploads" + + # We should not have hit the error handling path in uploads where a uploaded file is gone assert not env.pageserver.log_contains( "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more." )