fix: remove layer_removal_cs (#5108)

Quest: https://github.com/neondatabase/neon/issues/4745. Follow-up to
#4938.

- add in locks for compaction and gc, so we don't have multiple
executions at the same time in tests
- remove layer_removal_cs
- remove waiting for uploads in eviction/gc/compaction
    - #4938 will keep the file resident until upload completes

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Joonas Koivunen
2023-11-28 19:15:21 +02:00
committed by GitHub
parent 8625466144
commit 105edc265c
14 changed files with 321 additions and 206 deletions

View File

@@ -178,7 +178,14 @@ where
.unwrap_or(false);
if valid && *validated_generation == tenant_lsn_state.generation {
for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines {
for (timeline_id, pending_lsn) in tenant_lsn_state.timelines {
tracing::debug!(
%tenant_id,
%timeline_id,
current = %pending_lsn.result_slot.load(),
projected = %pending_lsn.projected,
"advancing validated remote_consistent_lsn",
);
pending_lsn.result_slot.store(pending_lsn.projected);
}
} else {

View File

@@ -138,6 +138,14 @@ pub struct GcResult {
#[serde(serialize_with = "serialize_duration_as_millis")]
pub elapsed: Duration,
/// The layers which were garbage collected.
///
/// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be
/// dropped in tests.
#[cfg(feature = "testing")]
#[serde(skip)]
pub(crate) doomed_layers: Vec<crate::tenant::storage_layer::Layer>,
}
// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds
@@ -158,5 +166,11 @@ impl AddAssign for GcResult {
self.layers_removed += other.layers_removed;
self.elapsed += other.elapsed;
#[cfg(feature = "testing")]
{
let mut other = other;
self.doomed_layers.append(&mut other.doomed_layers);
}
}
}

View File

@@ -2638,14 +2638,12 @@ impl Tenant {
// Perform GC for each timeline.
//
// Note that we don't hold the GC lock here because we don't want
// to delay the branch creation task, which requires the GC lock.
// A timeline GC iteration can be slow because it may need to wait for
// compaction (both require `layer_removal_cs` lock),
// but the GC iteration can run concurrently with branch creation.
// Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
// branch creation task, which requires the GC lock. A GC iteration can run concurrently
// with branch creation.
//
// See comments in [`Tenant::branch_timeline`] for more information
// about why branch creation task can run concurrently with timeline's GC iteration.
// See comments in [`Tenant::branch_timeline`] for more information about why branch
// creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
// We were requested to shut down. Stop and return with the progress we

View File

@@ -792,8 +792,6 @@ pub(crate) async fn set_new_tenant_config(
impl TenantManager {
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
@@ -1961,6 +1959,7 @@ pub(crate) async fn immediate_gc(
// Run in task_mgr to avoid race with tenant_detach operation
let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
@@ -1970,12 +1969,40 @@ pub(crate) async fn immediate_gc(
false,
async move {
fail::fail_point!("immediate_gc_task_pre");
let result = tenant
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
if let Ok(result) = result.as_mut() {
// why not futures unordered? it seems it needs very much the same task structure
// but would only run on single task.
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care just exit fast about the shutdown error
drop(rtc.wait_completion().await);
}
}
match task_done.send(result) {
Ok(_) => (),
Err(result) => error!("failed to send gc result: {result:?}"),

View File

@@ -827,10 +827,8 @@ impl RemoteTimelineClient {
Ok(())
}
///
/// Wait for all previously scheduled uploads/deletions to complete
///
pub async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
let mut receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
@@ -840,6 +838,7 @@ impl RemoteTimelineClient {
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
}
Ok(())
}
@@ -866,6 +865,56 @@ impl RemoteTimelineClient {
receiver
}
/// Wait for all previously scheduled operations to complete, and then stop.
///
/// Not cancellation safe
pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
// On cancellation the queue is left in ackward state of refusing new operations but
// proper stop is yet to be called. On cancel the original or some later task must call
// `stop` or `shutdown`.
let sg = scopeguard::guard((), |_| {
tracing::error!("RemoteTimelineClient::shutdown was cancelled; this should not happen, do not make this into an allowed_error")
});
let fut = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return Ok(()),
UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
UploadQueue::Initialized(ref mut init) => init,
};
// if the queue is already stuck due to a shutdown operation which was cancelled, then
// just don't add more of these as they would never complete.
//
// TODO: if launch_queued_tasks were to be refactored to accept a &mut UploadQueue
// in every place we would not have to jump through this hoop, and this method could be
// made cancellable.
if !upload_queue.shutting_down {
upload_queue.shutting_down = true;
upload_queue.queued_operations.push_back(UploadOp::Shutdown);
// this operation is not counted similar to Barrier
self.launch_queued_tasks(upload_queue);
}
upload_queue.shutdown_ready.clone().acquire_owned()
};
let res = fut.await;
scopeguard::ScopeGuard::into_inner(sg);
match res {
Ok(_permit) => unreachable!("shutdown_ready should not have been added permits"),
Err(_closed) => {
// expected
}
}
self.stop()
}
/// Set the deleted_at field in the remote index file.
///
/// This fails if the upload queue has not been `stop()`ed.
@@ -1103,7 +1152,9 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
UploadOp::Barrier(_) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};
// If we cannot launch this task, don't look any further.
@@ -1116,6 +1167,13 @@ impl RemoteTimelineClient {
break;
}
if let UploadOp::Shutdown = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
break;
}
// We can launch this task. Remove it from the queue first.
let next_op = upload_queue.queued_operations.pop_front().unwrap();
@@ -1136,6 +1194,7 @@ impl RemoteTimelineClient {
sender.send_replace(());
continue;
}
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
};
// Assign unique ID to this task
@@ -1274,10 +1333,10 @@ impl RemoteTimelineClient {
.await
.map_err(|e| anyhow::anyhow!(e))
}
UploadOp::Barrier(_) => {
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
warn!("unexpected Barrier operation in perform_upload_task");
warn!("unexpected {unexpected:?} operation in perform_upload_task");
break;
}
};
@@ -1371,7 +1430,7 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Barrier(_) => unreachable!(),
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
};
// Launch any queued tasks that were unblocked by this one.
@@ -1426,7 +1485,7 @@ impl RemoteTimelineClient {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::Barrier(_) => {
UploadOp::Barrier(..) | UploadOp::Shutdown => {
// we do not account these
return None;
}
@@ -1452,10 +1511,13 @@ impl RemoteTimelineClient {
}
/// Close the upload queue for new operations and cancel queued operations.
///
/// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
///
/// In-progress operations will still be running after this function returns.
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
/// to wait for them to complete, after calling this function.
pub fn stop(&self) -> Result<(), StopError> {
pub(crate) fn stop(&self) -> Result<(), StopError> {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
@@ -1493,6 +1555,8 @@ impl RemoteTimelineClient {
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
dangling_files: HashMap::default(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
let upload_queue = std::mem::replace(

View File

@@ -326,6 +326,24 @@ impl Layer {
Ok(())
}
/// Waits until this layer has been dropped (and if needed, local garbage collection and remote
/// deletion scheduling has completed).
///
/// Does not start garbage collection, use [`Self::garbage_collect_on_drop`] for that
/// separatedly.
#[cfg(feature = "testing")]
pub(crate) fn wait_drop(&self) -> impl std::future::Future<Output = ()> + 'static {
let mut rx = self.0.status.subscribe();
async move {
loop {
if let Err(tokio::sync::broadcast::error::RecvError::Closed) = rx.recv().await {
break;
}
}
}
}
}
/// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
@@ -475,10 +493,14 @@ impl Drop for LayerInner {
let file_size = self.layer_desc().file_size;
let timeline = self.timeline.clone();
let meta = self.metadata();
let status = self.status.clone();
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
let _g = span.entered();
// carry this until we are finished for [`Layer::wait_drop`] support
let _status = status;
let removed = match std::fs::remove_file(path) {
Ok(()) => true,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
@@ -1416,6 +1438,7 @@ impl Default for LayerImplMetrics {
)
.unwrap();
// reminder: this will be pageserver_layer_gcs_count_total with "_total" suffix
let gcs = metrics::register_int_counter_vec!(
"pageserver_layer_gcs_count",
"Garbage collections started and completed in the Layer implementation",

View File

@@ -251,14 +251,6 @@ pub struct Timeline {
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`].
/// This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
/// Note that [`DeleteTimelineFlow`] uses `delete_progress` field.
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -319,6 +311,24 @@ pub struct Timeline {
/// Cancellation token scoped to this timeline: anything doing long-running work relating
/// to the timeline should drop out when this token fires.
pub(crate) cancel: CancellationToken,
/// Make sure we only have one running compaction at a time in tests.
///
/// Must only be taken in two places:
/// - [`Timeline::compact`] (this file)
/// - [`delete::delete_local_layer_files`]
///
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
compaction_lock: tokio::sync::Mutex<()>,
/// Make sure we only have one running gc at a time.
///
/// Must only be taken in two places:
/// - [`Timeline::gc`] (this file)
/// - [`delete::delete_local_layer_files`]
///
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
}
pub struct WalReceiverInfo {
@@ -704,6 +714,8 @@ impl Timeline {
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
let _g = self.compaction_lock.lock().await;
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
@@ -758,7 +770,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
@@ -799,8 +811,7 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
.await?;
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();
if let Some(remote_client) = &self.remote_client {
@@ -946,7 +957,7 @@ impl Timeline {
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
if let Err(e) = client.wait_completion().await {
if let Err(e) = client.shutdown().await {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to flush to remote storage: {e:#}");
@@ -1201,16 +1212,6 @@ impl Timeline {
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Layer],
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
// ensure that the layers have finished uploading
// (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
// now lock out layer removal (compaction, gc, timeline deletion)
let _layer_removal_guard = self.layer_removal_cs.lock().await;
{
// to avoid racing with detach and delete_timeline
let state = self.current_state();
@@ -1421,7 +1422,6 @@ impl Timeline {
layer_flush_done_tx,
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
retain_lsns: Vec::new(),
@@ -1460,6 +1460,9 @@ impl Timeline {
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
cancel,
gate: Gate::new(format!("Timeline<{tenant_id}/{timeline_id}>")),
compaction_lock: tokio::sync::Mutex::default(),
gc_lock: tokio::sync::Mutex::default(),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -3150,13 +3153,8 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
impl Timeline {
/// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
///
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
async fn compact_level0_phase1(
self: &Arc<Self>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
@@ -3243,8 +3241,6 @@ impl Timeline {
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
// FIXME: downloading while holding layer_removal_cs is not great, but we will remove that
// soon
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
for l in level0_deltas_iter {
let lsn_range = &l.layer_desc().lsn_range;
@@ -3594,7 +3590,6 @@ impl Timeline {
///
async fn compact_level0(
self: &Arc<Self>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
@@ -3616,16 +3611,9 @@ impl Timeline {
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
let layer_removal_cs = layer_removal_cs.clone();
self.compact_level0_phase1(
layer_removal_cs,
phase1_layers_locked,
stats,
target_file_size,
&ctx,
)
.instrument(phase1_span)
.await?
self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
.instrument(phase1_span)
.await?
};
if new_layers.is_empty() && deltas_to_compact.is_empty() {
@@ -3633,17 +3621,6 @@ impl Timeline {
return Ok(());
}
// Before deleting any layers, we need to wait for their upload ops to finish.
// See remote_timeline_client module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut guard = self.layers.write().await;
let mut duplicated_layers = HashSet::new();
@@ -3675,12 +3652,7 @@ impl Timeline {
};
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(
&layer_removal_cs,
&remove_layers,
&insert_layers,
&self.metrics,
);
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
@@ -3791,19 +3763,17 @@ impl Timeline {
Ok(())
}
///
/// Garbage collect layer files on a timeline that are no longer needed.
///
/// Currently, we don't make any attempt at removing unneeded page versions
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
///
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
let _g = self.gc_lock.lock().await;
let timer = self.metrics.garbage_collect_histo.start_timer();
fail_point!("before-timeline-gc");
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
// Is the timeline being deleted?
if self.is_stopping() {
anyhow::bail!("timeline is Stopping");
@@ -3821,13 +3791,7 @@ impl Timeline {
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let res = self
.gc_timeline(
layer_removal_cs.clone(),
horizon_cutoff,
pitr_cutoff,
retain_lsns,
new_gc_cutoff,
)
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.instrument(
info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
)
@@ -3841,7 +3805,6 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
@@ -3879,17 +3842,6 @@ impl Timeline {
debug!("retain_lsns: {:?}", retain_lsns);
// Before deleting any layers, we need to wait for their upload ops to finish.
// See storage_sync module level comment on consistency.
// Do it here because we don't want to hold self.layers.write() while waiting.
if let Some(remote_client) = &self.remote_client {
debug!("waiting for upload ops to complete");
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
}
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpaceRandomAccum::default();
@@ -4005,6 +3957,11 @@ impl Timeline {
//
// This does not in fact have any effect as we no longer consider local metadata unless
// running without remote storage.
//
// This unconditionally schedules also an index_part.json update, even though, we will
// be doing one a bit later with the unlinked gc'd layers.
//
// TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
self.update_metadata_file(self.disk_consistent_lsn.load(), None)
.await?;
@@ -4019,11 +3976,16 @@ impl Timeline {
remote_client.schedule_gc_update(&gc_layers)?;
}
guard.finish_gc_timeline(&layer_removal_cs, gc_layers);
guard.finish_gc_timeline(&gc_layers);
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
}
#[cfg(feature = "testing")]
{
result.doomed_layers = gc_layers;
}
}
info!(
@@ -4035,9 +3997,7 @@ impl Timeline {
Ok(result)
}
///
/// Reconstruct a value, using the given base image and WAL records in 'data'.
///
async fn reconstruct_value(
&self,
key: Key,

View File

@@ -110,11 +110,11 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
Ok(())
}
/// Grab the layer_removal_cs lock, and actually perform the deletion.
/// Grab the compaction and gc locks, and actually perform the deletion.
///
/// This lock prevents prevents GC or compaction from running at the same time.
/// The GC task doesn't register itself with the timeline it's operating on,
/// so it might still be running even though we called `shutdown_tasks`.
/// The locks prevent GC or compaction from running at the same time. The background tasks do not
/// register themselves with the timeline it's operating on, so it might still be running even
/// though we called `shutdown_tasks`.
///
/// Note that there are still other race conditions between
/// GC, compaction and timeline deletion. See
@@ -122,14 +122,19 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
///
/// No timeout here, GC & Compaction should be responsive to the
/// `TimelineState::Stopping` change.
async fn delete_local_layer_files(
// pub(super): documentation link
pub(super) async fn delete_local_layer_files(
conf: &PageServerConf,
tenant_id: TenantId,
timeline: &Timeline,
) -> anyhow::Result<()> {
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
let guards = async { tokio::join!(timeline.gc_lock.lock(), timeline.compaction_lock.lock()) };
let guards = crate::timed(
guards,
"acquire gc and compaction locks",
std::time::Duration::from_secs(5),
)
.await;
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
@@ -150,8 +155,8 @@ async fn delete_local_layer_files(
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
// ErrorKind::NotFound can also happen if we race with tenant detach, because,
// no locks are shared.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
@@ -219,8 +224,8 @@ async fn delete_local_layer_files(
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
info!("finished deleting layer files, releasing locks");
drop(guards);
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?

View File

@@ -296,7 +296,6 @@ impl Timeline {
stats.evicted += 1;
}
Some(Err(EvictionError::NotFound | EvictionError::Downloaded)) => {
// compaction/gc removed the file while we were waiting on layer_removal_cs
stats.not_evictable += 1;
}
}

View File

@@ -190,7 +190,6 @@ impl LayerManager {
/// Called when compaction is completed.
pub(crate) fn finish_compact_l0(
&mut self,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: &[Layer],
compact_to: &[ResidentLayer],
metrics: &TimelineMetrics,
@@ -201,25 +200,16 @@ impl LayerManager {
metrics.record_new_file_metrics(l.layer_desc().file_size);
}
for l in compact_from {
Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr);
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
pub(crate) fn finish_gc_timeline(
&mut self,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Layer>,
) {
/// Called when garbage collect has selected the layers to be removed.
pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
let mut updates = self.layer_map.batch_update();
for doomed_layer in gc_layers {
Self::delete_historic_layer(
layer_removal_cs,
&doomed_layer,
&mut updates,
&mut self.layer_fmgr,
);
Self::delete_historic_layer(doomed_layer, &mut updates, &mut self.layer_fmgr);
}
updates.flush()
}
@@ -238,7 +228,6 @@ impl LayerManager {
/// Remote storage is not affected by this operation.
fn delete_historic_layer(
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: &Layer,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager<Layer>,

View File

@@ -90,6 +90,14 @@ pub(crate) struct UploadQueueInitialized {
/// bug causing leaks, then it's better to not leave this enabled for production builds.
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
/// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
pub(crate) shutting_down: bool,
/// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
/// wait on until one of them stops the queue. The semaphore is closed when
/// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
}
impl UploadQueueInitialized {
@@ -148,6 +156,8 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
*self = UploadQueue::Initialized(state);
@@ -195,6 +205,8 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
*self = UploadQueue::Initialized(state);
@@ -206,7 +218,13 @@ impl UploadQueue {
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(x) => Ok(x),
UploadQueue::Initialized(x) => {
if !x.shutting_down {
Ok(x)
} else {
anyhow::bail!("queue is shutting down")
}
}
}
}
@@ -250,6 +268,10 @@ pub(crate) enum UploadOp {
/// Barrier. When the barrier operation is reached,
Barrier(tokio::sync::watch::Sender<()>),
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
/// this is the same as a Barrier.
Shutdown,
}
impl std::fmt::Display for UploadOp {
@@ -271,6 +293,7 @@ impl std::fmt::Display for UploadOp {
write!(f, "Delete({} layers)", delete.layers.len())
}
UploadOp::Barrier(_) => write!(f, "Barrier"),
UploadOp::Shutdown => write!(f, "Shutdown"),
}
}
}

View File

@@ -94,11 +94,10 @@ pub(crate) async fn branch_cleanup_and_check_errors(
!= index_part.get_disk_consistent_lsn()
{
result.errors.push(format!(
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
index_part.metadata.disk_consistent_lsn(),
index_part.get_disk_consistent_lsn(),
))
"Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
index_part.metadata.disk_consistent_lsn(),
index_part.get_disk_consistent_lsn(),
))
}
if index_part.layer_metadata.is_empty() {
@@ -109,8 +108,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
for (layer, metadata) in index_part.layer_metadata {
if metadata.file_size == 0 {
result.errors.push(format!(
"index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
))
"index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
))
}
let layer_map_key = (layer, metadata.generation);
@@ -136,7 +135,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
// a new generation that didn't upload an index yet.
//
// Even so, a layer that is not referenced by the index could just
// be something enqueued for deletion, so while this check is valid
// be something enqueued for deletion, so while this check is valid
// for indicating that a layer is garbage, it is not an indicator
// of a problem.
gen < &index_part_generation)

View File

@@ -4,7 +4,7 @@ import json
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple
import requests
from requests.adapters import HTTPAdapter
@@ -100,6 +100,15 @@ class LayerMapInfo:
counts[hist_layer.kind] += 1
return counts
def delta_layers(self) -> List[HistoricLayerInfo]:
return [x for x in self.historic_layers if x.kind == "Delta"]
def image_layers(self) -> List[HistoricLayerInfo]:
return [x for x in self.historic_layers if x.kind == "Image"]
def historic_by_name(self) -> Set[str]:
return set(x.layer_file_name for x in self.historic_layers)
@dataclass
class TenantConfig:
@@ -416,6 +425,10 @@ class PageserverHttpClient(requests.Session):
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
"""
Unlike most handlers, this will wait for the layers to be actually
complete registering themselves to the deletion queue.
"""
self.is_testing_enabled_or_skip()
log.info(

View File

@@ -763,9 +763,7 @@ def test_compaction_waits_for_upload(
neon_env_builder: NeonEnvBuilder,
):
"""
Compaction waits for outstanding uploads to complete, so that it avoids deleting layers
files that have not yet been uploaded. This test forces a race between upload and
compaction.
This test forces a race between upload and compaction.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@@ -784,6 +782,16 @@ def test_compaction_waits_for_upload(
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
layers_at_creation = client.layer_map_info(tenant_id, timeline_id)
deltas_at_creation = len(layers_at_creation.delta_layers())
assert (
deltas_at_creation == 1
), "are you fixing #5863? make sure we end up with 2 deltas at the end of endpoint lifecycle"
# Make new layer uploads get stuck.
# Note that timeline creation waits for the initial layers to reach remote storage.
# So at this point, the `layers_at_creation` are in remote storage.
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
# Build two tables with some data inside
@@ -791,85 +799,71 @@ def test_compaction_waits_for_upload(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
client.timeline_checkpoint(tenant_id, timeline_id)
deltas_at_first = len(client.layer_map_info(tenant_id, timeline_id).delta_layers())
assert (
deltas_at_first == 2
), "are you fixing #5863? just add one more checkpoint after 'CREATE TABLE bar ...' statement."
endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# Now make the flushing hang and update one small piece of data
client.configure_failpoints(("before-upload-layer-pausable", "pause"))
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
checkpoint_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_result: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
compact_barrier = threading.Barrier(2)
layers_before_last_checkpoint = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
upload_stuck_layers = layers_before_last_checkpoint - layers_at_creation.historic_by_name()
def checkpoint_in_background():
try:
log.info("Checkpoint starting")
client.timeline_checkpoint(tenant_id, timeline_id)
log.info("Checkpoint complete")
checkpoint_result.put(None)
except PageserverApiException as e:
log.info("Checkpoint errored: {e}")
checkpoint_result.put(e)
assert len(upload_stuck_layers) > 0
def compact_in_background():
compact_barrier.wait()
try:
log.info("Compaction starting")
client.timeline_compact(tenant_id, timeline_id)
log.info("Compaction complete")
compact_result.put(None)
except PageserverApiException as e:
log.info("Compaction errored: {e}")
compact_result.put(e)
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert path.exists(), "while uploads are stuck the layers should be present on disk"
checkpoint_thread = threading.Thread(target=checkpoint_in_background)
checkpoint_thread.start()
# now this will do the L0 => L1 compaction and want to remove
# upload_stuck_layers and the original initdb L0
client.timeline_checkpoint(tenant_id, timeline_id)
compact_thread = threading.Thread(target=compact_in_background)
compact_thread.start()
# as uploads are paused, the the upload_stuck_layers should still be with us
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert path.exists(), "uploads are stuck still over compaction"
try:
# Start the checkpoint, see that it blocks
log.info("Waiting to see checkpoint hang...")
time.sleep(5)
assert checkpoint_result.empty()
compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
overlap = compacted_layers.intersection(upload_stuck_layers)
assert len(overlap) == 0, "none of the L0's should remain after L0 => L1 compaction"
assert (
len(compacted_layers) == 1
), "there should be one L1 after L0 => L1 compaction (without #5863 being fixed)"
# Start the compaction, see that it finds work to do but blocks
compact_barrier.wait()
log.info("Waiting to see compaction hang...")
time.sleep(5)
assert compact_result.empty()
def layer_deletes_completed():
m = client.get_metric_value("pageserver_layer_gcs_count_total", {"state": "completed"})
if m is None:
return 0
return int(m)
# This is logged once compaction is started, but before we wait for operations to complete
assert env.pageserver.log_contains("compact_level0_phase1 stats available.")
# if initdb created an initial delta layer, it might already be gc'd
# because it was uploaded before the failpoint was enabled. however, the
# deletion is not guaranteed to be complete.
assert layer_deletes_completed() <= 1
# Once we unblock uploads the compaction should complete successfully
log.info("Disabling failpoint")
client.configure_failpoints(("before-upload-layer-pausable", "off"))
log.info("Awaiting compaction result")
assert compact_result.get(timeout=10) is None
log.info("Awaiting checkpoint result")
assert checkpoint_result.get(timeout=10) is None
except Exception:
# Log the actual failure's backtrace here, before we proceed to join threads
log.exception("Failure, cleaning up...")
raise
finally:
compact_barrier.abort()
checkpoint_thread.join()
compact_thread.join()
client.configure_failpoints(("before-upload-layer-pausable", "off"))
# Ensure that this actually terminates
wait_upload_queue_empty(client, tenant_id, timeline_id)
# We should not have hit the error handling path in uploads where the remote file is gone
def until_layer_deletes_completed():
deletes = layer_deletes_completed()
log.info(f"layer_deletes: {deletes}")
# ensure that initdb delta layer AND the previously stuck are now deleted
assert deletes >= len(upload_stuck_layers) + 1
wait_until(10, 1, until_layer_deletes_completed)
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert (
not path.exists()
), "l0 should now be removed because of L0 => L1 compaction and completed uploads"
# We should not have hit the error handling path in uploads where a uploaded file is gone
assert not env.pageserver.log_contains(
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
)