deletion queue: future-proof DeletionList format

It needs places to put generation numbers
This commit is contained in:
John Spray
2023-08-23 10:06:47 +01:00
parent 696b49eeba
commit c9a007d05b
3 changed files with 75 additions and 41 deletions

View File

@@ -2,6 +2,8 @@ mod backend;
mod executor;
mod frontend;
use std::collections::HashMap;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
@@ -11,7 +13,7 @@ use thiserror::Error;
use tokio;
use tokio_util::sync::CancellationToken;
use tracing::{self, debug, error};
use utils::id::{TenantId, TimelineId};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
pub(crate) use self::backend::BackendQueueWorker;
use self::executor::ExecutorWorker;
@@ -88,6 +90,14 @@ pub struct DeletionQueueClient {
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TimelineDeletionList {
objects: Vec<RemotePath>,
// TODO: Tenant attachment generation will go here
// (see https://github.com/neondatabase/neon/pull/4919)
// attach_gen: u32,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionList {
@@ -97,9 +107,12 @@ struct DeletionList {
/// Used for constructing a unique key for each deletion list we write out.
sequence: u64,
/// These objects are elegible for deletion: they are unlinked from timeline metadata, and
/// we are free to delete them at any time from their presence in this data structure onwards.
objects: Vec<RemotePath>,
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
/// nested HashMaps by TenantTimelineID
objects: HashMap<TenantTimelineId, TimelineDeletionList>,
// TODO: Node generation will go here
// (see https://github.com/neondatabase/neon/pull/4919)
// node_gen: u32,
}
#[serde_as]
@@ -134,9 +147,42 @@ impl DeletionList {
Self {
version: Self::VERSION_LATEST,
sequence,
objects: Vec::new(),
objects: HashMap::new(),
}
}
fn is_empty(&self) -> bool {
self.objects.is_empty()
}
fn len(&self) -> usize {
self.objects.values().map(|v| v.objects.len()).sum()
}
fn push(&mut self, tenant: &TenantId, timeline: &TimelineId, mut objects: Vec<RemotePath>) {
if objects.is_empty() {
// Avoid inserting an empty TimelineDeletionList: this preserves the property
// that if we have no keys, then self.objects is empty (used in Self::is_empty)
return;
}
let key = TenantTimelineId::new(tenant.clone(), timeline.clone());
let entry = self
.objects
.entry(key)
.or_insert_with(|| TimelineDeletionList {
objects: Vec::new(),
});
entry.objects.append(&mut objects)
}
fn take_paths(&mut self) -> Vec<RemotePath> {
self.objects
.drain()
.map(|(_k, v)| v.objects.into_iter())
.flatten()
.collect()
}
}
#[derive(Error, Debug)]

View File

@@ -131,30 +131,34 @@ impl BackendQueueWorker {
}
pub async fn flush(&mut self) {
let mut onward_lists: Vec<DeletionList> = Vec::new();
std::mem::swap(&mut onward_lists, &mut self.pending_lists);
for list in onward_lists {
let objects = list.objects.clone();
// TODO: a take_objects method
self.executed_lists.push(list);
self.pending_key_count = 0;
// Submit all keys from pending DeletionLists into the executor
for list in &mut self.pending_lists {
let objects = list.take_paths();
if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await {
warn!("Shutting down");
return;
};
}
// Flush the executor to ensure all the operations we just submitted have been executed
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let flush_op = FlushOp { tx };
if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await {
warn!("Shutting down");
return;
};
if rx.await.is_err() {
warn!("Shutting down");
return;
}
// After flush, we are assured that all contents of the pending lists
// are executed
self.executed_lists.append(&mut self.pending_lists);
// Erase the lists we executed
self.cleanup_lists().await;
}
@@ -184,15 +188,6 @@ impl BackendQueueWorker {
match msg {
BackendQueueMessage::Delete(list) => {
if list.objects.is_empty() {
// This shouldn't happen, but is harmless. warn so that
// tests will fail if we have such a bug, but proceed with
// processing subsequent messages.
warn!("Empty DeletionList passed to deletion backend");
self.executed_lists.push(list);
continue;
}
self.pending_key_count += list.objects.len();
self.pending_lists.push(list);

View File

@@ -122,9 +122,7 @@ impl FrontendQueueWorker {
/// This does not return errors, because on failure to flush we do not lose
/// any state: flushing will be retried implicitly on the next deadline
async fn flush(&mut self) {
if self.pending.objects.is_empty() {
// We do not expect to be called in this state, but handle it so that later
// logging code can be assured that therre is always a first+last key to print
if self.pending.is_empty() {
for f in self.pending_flushes.drain(..) {
f.fire();
}
@@ -133,18 +131,7 @@ impl FrontendQueueWorker {
match self.upload_pending_list().await {
Ok(_) => {
info!(
sequence = self.pending.sequence,
"Stored deletion list ({0}..{1})",
self.pending
.objects
.first()
.expect("list should be non-empty"),
self.pending
.objects
.last()
.expect("list should be non-empty"),
);
info!(sequence = self.pending.sequence, "Stored deletion list");
for f in self.pending_flushes.drain(..) {
f.fire();
@@ -300,7 +287,7 @@ impl FrontendQueueWorker {
// We will drop out of recovery if this fails: it indicates that we are shutting down
// or the backend has panicked
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.objects.len() as u64);
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64);
self.tx
.send(BackendQueueMessage::Delete(deletion_list))
.await?;
@@ -358,12 +345,13 @@ impl FrontendQueueWorker {
match msg {
FrontendQueueMessage::Delete(op) => {
debug!(
"Deletion enqueue {0} layers, {1} other objects",
"Delete: ingesting {0} layers, {1} other objects",
op.layers.len(),
op.objects.len()
);
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
let mut layer_paths = Vec::new();
for layer in op.layers {
// TODO go directly to remote path without composing local path
let local_path = timeline_path.join(layer.file_name());
@@ -373,22 +361,27 @@ impl FrontendQueueWorker {
panic!("Can't make a timeline path! {e}");
}
};
self.pending.objects.push(path);
layer_paths.push(path);
}
self.pending.objects.extend(op.objects.into_iter())
self.pending
.push(&op.tenant_id, &op.timeline_id, layer_paths);
self.pending
.push(&op.tenant_id, &op.timeline_id, op.objects);
}
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
// Execute immediately
debug!("No pending objects, flushing immediately");
debug!("Flush: No pending objects, flushing immediately");
op.fire()
} else {
// Execute next time we flush
debug!("Flush: adding to pending flush list for next deadline flush");
self.pending_flushes.push(op);
}
}
FrontendQueueMessage::FlushExecute(op) => {
debug!("FlushExecute: passing through to backend");
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
info!("Can't flush, shutting down ({e})");