mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
deletion queue: add push for generic objects (layers and garbage)
This commit is contained in:
@@ -72,7 +72,11 @@ enum FrontendQueueMessage {
|
||||
struct DeletionOp {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
|
||||
// have a config object handy to project it to a remote key, and need the consuming worker
|
||||
// to do it for you.
|
||||
layers: Vec<LayerFileName>,
|
||||
objects: Vec<RemotePath>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -131,18 +135,35 @@ impl DeletionQueueClient {
|
||||
/// persistent, but it may be executed at any time after this function enters: do not push
|
||||
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
|
||||
/// references them).
|
||||
pub async fn push(
|
||||
pub async fn push_layers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
layers: Vec<LayerFileName>,
|
||||
) {
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
|
||||
info!("pushed!");
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
layers,
|
||||
objects: Vec::new(),
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Just like push_layers, but using some already-known remote paths, instead of abstract layer names
|
||||
pub async fn push_objects(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
objects: Vec<RemotePath>,
|
||||
) {
|
||||
DELETION_QUEUE_SUBMITTED.inc_by(objects.len() as u64);
|
||||
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
layers: Vec::new(),
|
||||
objects: objects,
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
@@ -481,6 +502,8 @@ impl FrontendQueueWorker {
|
||||
};
|
||||
self.pending.objects.push(path);
|
||||
}
|
||||
|
||||
self.pending.objects.extend(op.objects.into_iter())
|
||||
}
|
||||
FrontendQueueMessage::Flush(op) => {
|
||||
if self.pending.objects.is_empty() {
|
||||
@@ -686,7 +709,7 @@ mod test {
|
||||
|
||||
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
|
||||
info!("Pushing");
|
||||
ctx.runtime.block_on(client.push(
|
||||
ctx.runtime.block_on(client.push_layers(
|
||||
tenant_id,
|
||||
TIMELINE_ID,
|
||||
[layer_file_name_1.clone()].to_vec(),
|
||||
@@ -758,6 +781,7 @@ pub mod mock {
|
||||
timeline_id = %op.timeline_id,
|
||||
);
|
||||
|
||||
let mut objects = op.objects;
|
||||
for layer in op.layers {
|
||||
let local_path = timeline_path.join(layer.file_name());
|
||||
let path = match conf.remote_path(&local_path) {
|
||||
@@ -766,6 +790,10 @@ pub mod mock {
|
||||
panic!("Can't make a timeline path! {e}");
|
||||
}
|
||||
};
|
||||
objects.push(path);
|
||||
}
|
||||
|
||||
for path in objects {
|
||||
info!("Executing deletion {path}");
|
||||
match remote_storage.delete(&path).await {
|
||||
Ok(_) => {
|
||||
|
||||
Reference in New Issue
Block a user