review comments

This commit is contained in:
Christian Schwarz
2023-09-14 14:40:12 +02:00
parent dc1c6b28db
commit 556211b701
3 changed files with 54 additions and 2 deletions

View File

@@ -126,6 +126,7 @@ struct FlushOp {
}
impl FlushOp {
// better name: wake_waiters
fn fire(self) {
if self.tx.send(()).is_err() {
// oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.

View File

@@ -88,6 +88,8 @@ where
}
}
// these lists have been drained, all we need from them is the list path.
// See my comments in the caller caller.
async fn cleanup_lists(&mut self, lists: Vec<DeletionList>) {
for list in lists {
let list_path = self.conf.deletion_list_path(list.sequence);
@@ -98,6 +100,13 @@ where
// be touching these files. We will leave the file behind. Subsequent
// pageservers will try and load it again: hopefully whatever storage
// issue (probably permissions) has been fixed by then.
//
// Hm, the recover() function would load these again as `validate=true`, right?
// So, we'd retry the deletions, even though we know that by the time this
// function gets called, the deletions were successful.
// That seems wasteful. Can't the header keep an additional pointer/sequence number
// that tracks the deletions lists that were already fully executed?
// (... I guess I need to read the code that handles failure to write out the header)
tracing::error!("Failed to delete {}: {e:#}", list_path.display());
break;
}
@@ -256,7 +265,7 @@ where
// After successful validation, nothing is pending: any lists that
// made it through validation will be in validated_lists.
assert!(self.pending_lists.is_empty());
self.pending_key_count = 0;
self.pending_key_count = 0; // why can't we just pending_lists.len() in all places we use pending_key_count
tracing::debug!(
"Validation complete, have {} validated lists",
@@ -272,6 +281,12 @@ where
// Drain `validated_lists` into the executor
let mut executing_lists = Vec::new();
for mut list in self.validated_lists.drain(..) {
// again, this weird C++-esque &mut self drain function.
// I think a consuming `into_drain_paths()` would be more idiomatic.
//
// I see you need the drained list for the `cleanup_lists` call below.
// Make an `into_...()` function that returns a tuple, then.
// Less mutable state, more values = better.
let objects = list.drain_paths();
self.tx
.send(ExecutorMessage::Delete(objects))
@@ -288,6 +303,7 @@ where
Ok(())
}
// better name: flush_to_executor_and_wait
async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
// Flush the executor, so that all the keys referenced by these deletion lists
// are actually removed from remote storage. This is a precondition to deleting
@@ -326,6 +342,7 @@ where
match msg {
BackendQueueMessage::Delete(list) => {
if list.validated {
// this only happens during recovery
self.validated_lists.push(list)
} else {
self.pending_key_count += list.len();
@@ -334,10 +351,14 @@ where
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
// Drop possible shutdown error, because we will just fall out of loop if that happens
// TODO: log the error at least?
// Alternatively, put a match here to ensure that it's really the shutdown error,
// and not some other error variant we might add in the future.
drop(self.flush().await);
}
}
BackendQueueMessage::Flush(op) => {
// would prefer an exhaustive match of the Ok() and each Err(...) variant here.
if let Ok(()) = self.flush().await {
// If we fail due to shutting down, we will just drop `op` to propagate that status.
op.fire();

View File

@@ -1,3 +1,11 @@
//! The frontend batches deletion requests into DeletionLists and once batched,
//! passes them to the backend for validation & execution.
//!
//! Durability: the frontend persists the DeletionLists to disk, and the backend persists
//! the header file that keeps track of which deletion lists have been validated yet.
//! The split responsiblity is a bit headache-inducing, but, it allows us to persist
//! intention to delete in the frontend, even if the backend is down/full/slow.
use super::BackendQueueMessage;
use super::DeletionHeader;
use super::DeletionList;
@@ -130,6 +138,8 @@ impl FrontendQueueWorker {
f.fire();
}
// the .drain() is like a C++ move constructor; unidiomatics.
// Use std::mem::replace (together with the line below) instead.
let onward_list = self.pending.drain();
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
@@ -189,6 +199,8 @@ impl FrontendQueueWorker {
}
}
/// 1. There are no safeguards that this function isn't called more than once.
/// 2. Why isn't this part of the DeletionQueue::new() function?
async fn recover(
&mut self,
attached_tenants: HashMap<TenantId, Generation>,
@@ -204,6 +216,7 @@ impl FrontendQueueWorker {
// Start our next deletion list from after the last location validated by
// previous process lifetime, or after the last location found (it is updated
// below after enumerating the deletion lists)
// isn't self.pending.sequence always 0 at this point?
self.pending.sequence = std::cmp::max(self.pending.sequence, validated_sequence + 1);
let deletion_directory = self.conf.deletion_prefix();
@@ -234,6 +247,9 @@ impl FrontendQueueWorker {
let file_name = dentry.file_name().to_owned();
let basename = file_name.to_string_lossy();
let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
// named capture group would help readability here.
// Also, I see two capture groups, what is the second one for? version number, right?
// Would have been self-documenting through named capture groups.
m.get(1)
.expect("Non optional group should be present")
.as_str()
@@ -261,6 +277,9 @@ impl FrontendQueueWorker {
}
for s in seqs {
// why doesn't the header simply store the set of DeletionList names / sequence numbers?
// Then we could save ourselves the directory enumeration above, along with the
// regex parsing and cmp::max stuff.
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path).await?;
@@ -302,13 +321,14 @@ impl FrontendQueueWorker {
// We will drop out of recovery if this fails: it indicates that we are shutting down
// or the backend has panicked
// we already counted these before we crashed, mustn't count them again
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64);
self.tx
.send(BackendQueueMessage::Delete(deletion_list))
.await?;
}
info!(next_sequence = self.pending.sequence, "Replay complete");
info!(next_sequence = self.pending.sequence, "Recovery complete");
Ok(())
}
@@ -383,6 +403,8 @@ impl FrontendQueueWorker {
// Unexpected: after we flush, we should have
// drained self.pending, so a conflict on
// generation numbers should be impossible.
// Wondering whether we should have a counter metric that we bump each time we know we might be leaking objects.
// Better / easier / cheaper to monitor than scraping logs.
tracing::error!(
"Failed to enqueue deletions, leaking objects. This is a bug."
);
@@ -413,6 +435,14 @@ impl FrontendQueueWorker {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped, or something is critically broken with
// the local filesystem holding deletion lists.
//
// Hmm, so, if we fail to recover, we return from the frontend queue worker.
// This means subsequent submissions will fail, right?
// Which means each tenant's compaction loop affected by failed submission will enter the "retry in 2" seconds loop.
// => IMO we should move recovery to the DeletionQueue::new() constructor function,
// and if it fails, refuse to start the pageserver. Avoids a whole bunch of pain.
// If recovery fails because the on-disk state is garbage, can always `rm -rf` it and then
// restart the PS.
info!(
"Deletion queue recover aborted, deletion queue will not proceed ({e})"
);