mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
pageserver: send timeline deletions through the deletion queue
This commit is contained in:
@@ -665,7 +665,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
// Enqueue deletions
|
||||
deletion_queue_client
|
||||
.push(self.tenant_id, self.timeline_id, names.to_vec())
|
||||
.push_layers(self.tenant_id, self.timeline_id, names.to_vec())
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -791,12 +791,13 @@ impl RemoteTimelineClient {
|
||||
/// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
|
||||
/// The function deletes layer files one by one, then lists the prefix to see if we leaked something
|
||||
/// deletes leaked files if any and proceeds with deletion of index file at the end.
|
||||
pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(crate) async fn delete_all(
|
||||
self: &Arc<Self>,
|
||||
deletion_queue: &DeletionQueueClient,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let (mut receiver, deletions_queued) = {
|
||||
let mut deletions_queued = 0;
|
||||
|
||||
let layers: Vec<LayerFileName> = {
|
||||
let mut locked = self.upload_queue.lock().unwrap();
|
||||
let stopped = locked.stopped_mut()?;
|
||||
|
||||
@@ -808,41 +809,27 @@ impl RemoteTimelineClient {
|
||||
|
||||
stopped
|
||||
.upload_queue_for_deletion
|
||||
.queued_operations
|
||||
.reserve(stopped.upload_queue_for_deletion.latest_files.len());
|
||||
|
||||
// schedule the actual deletions
|
||||
for name in stopped.upload_queue_for_deletion.latest_files.keys() {
|
||||
let op = UploadOp::Delete(Delete {
|
||||
file_kind: RemoteOpFileKind::Layer,
|
||||
layer_file_name: name.clone(),
|
||||
scheduled_from_timeline_delete: true,
|
||||
});
|
||||
self.calls_unfinished_metric_begin(&op);
|
||||
stopped
|
||||
.upload_queue_for_deletion
|
||||
.queued_operations
|
||||
.push_back(op);
|
||||
|
||||
info!("scheduled layer file deletion {name}");
|
||||
deletions_queued += 1;
|
||||
}
|
||||
|
||||
self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion);
|
||||
|
||||
(
|
||||
self.schedule_barrier(&mut stopped.upload_queue_for_deletion),
|
||||
deletions_queued,
|
||||
)
|
||||
.latest_files
|
||||
.drain()
|
||||
.map(|kv| kv.0)
|
||||
.collect()
|
||||
};
|
||||
|
||||
receiver.changed().await.context("upload queue shut down")?;
|
||||
let layer_deletion_count = layers.len();
|
||||
|
||||
deletion_queue
|
||||
.push_layers(self.tenant_id, self.timeline_id, layers)
|
||||
.await;
|
||||
|
||||
// Do not delete index part yet, it is needed for possible retry. If we remove it first
|
||||
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
|
||||
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
let timeline_storage_path = self.conf.remote_path(&timeline_path)?;
|
||||
|
||||
// Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't
|
||||
// taking the burden of listing all the layers that we already know we should delete.
|
||||
deletion_queue.flush_execute().await;
|
||||
|
||||
let remaining = backoff::retry(
|
||||
|| async {
|
||||
self.storage_impl
|
||||
@@ -869,16 +856,11 @@ impl RemoteTimelineClient {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let not_referenced_count = remaining.len();
|
||||
if !remaining.is_empty() {
|
||||
backoff::retry(
|
||||
|| async { self.storage_impl.delete_objects(&remaining).await },
|
||||
|_e| false,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"delete_objects",
|
||||
)
|
||||
.await
|
||||
.context("delete_objects")?;
|
||||
deletion_queue
|
||||
.push_objects(self.tenant_id, self.timeline_id, remaining)
|
||||
.await;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-delete", |_| {
|
||||
@@ -889,17 +871,14 @@ impl RemoteTimelineClient {
|
||||
|
||||
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
|
||||
|
||||
debug!("deleting index part");
|
||||
debug!("enqueuing index part deletion");
|
||||
deletion_queue
|
||||
.push_objects(self.tenant_id, self.timeline_id, [index_file_path].to_vec())
|
||||
.await;
|
||||
|
||||
backoff::retry(
|
||||
|| async { self.storage_impl.delete(&index_file_path).await },
|
||||
|_e| false,
|
||||
FAILED_UPLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"delete_index",
|
||||
)
|
||||
.await
|
||||
.context("delete_index")?;
|
||||
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
|
||||
// for a flush to a persistent deletion list so that we may be sure deletion will occur.
|
||||
deletion_queue.flush_execute().await;
|
||||
|
||||
fail::fail_point!("timeline-delete-after-index-delete", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
@@ -907,7 +886,7 @@ impl RemoteTimelineClient {
|
||||
))?
|
||||
});
|
||||
|
||||
info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json");
|
||||
info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1262,6 +1262,18 @@ impl Timeline {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_all_remote(&self) -> anyhow::Result<()> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
if let Some(deletion_queue_client) = &self.deletion_queue_client {
|
||||
remote_client.delete_all(&deletion_queue_client).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
||||
@@ -239,15 +239,6 @@ async fn delete_local_layer_files(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes remote layers and an index file after them.
|
||||
async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
|
||||
if let Some(remote_client) = &timeline.remote_client {
|
||||
remote_client.delete_all().await.context("delete_all")?
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This function removs remaining traces of a timeline on disk.
|
||||
// Namely: metadata file, timeline directory, delete mark.
|
||||
// Note: io::ErrorKind::NotFound are ignored for metadata and timeline dir.
|
||||
@@ -562,7 +553,7 @@ impl DeleteTimelineFlow {
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
delete_local_layer_files(conf, tenant.tenant_id, timeline).await?;
|
||||
|
||||
delete_remote_layers_and_index(timeline).await?;
|
||||
timeline.delete_all_remote().await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user