From 556211b701f9b2142fb45e655805555b72b7b523 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 14 Sep 2023 14:40:12 +0200 Subject: [PATCH] review comments --- pageserver/src/deletion_queue.rs | 1 + pageserver/src/deletion_queue/backend.rs | 23 +++++++++++++++- pageserver/src/deletion_queue/frontend.rs | 32 ++++++++++++++++++++++- 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 0f36808b4a..a6b5a5de0e 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -126,6 +126,7 @@ struct FlushOp { } impl FlushOp { + // better name: wake_waiters fn fire(self) { if self.tx.send(()).is_err() { // oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush. diff --git a/pageserver/src/deletion_queue/backend.rs b/pageserver/src/deletion_queue/backend.rs index 88334f5f88..06ca4c4b74 100644 --- a/pageserver/src/deletion_queue/backend.rs +++ b/pageserver/src/deletion_queue/backend.rs @@ -88,6 +88,8 @@ where } } + // these lists have been drained, all we need from them is the list path. + // See my comments in the caller caller. async fn cleanup_lists(&mut self, lists: Vec) { for list in lists { let list_path = self.conf.deletion_list_path(list.sequence); @@ -98,6 +100,13 @@ where // be touching these files. We will leave the file behind. Subsequent // pageservers will try and load it again: hopefully whatever storage // issue (probably permissions) has been fixed by then. + // + // Hm, the recover() function would load these again as `validate=true`, right? + // So, we'd retry the deletions, even though we know that by the time this + // function gets called, the deletions were successful. + // That seems wasteful. Can't the header keep an additional pointer/sequence number + // that tracks the deletions lists that were already fully executed? + // (... I guess I need to read the code that handles failure to write out the header) tracing::error!("Failed to delete {}: {e:#}", list_path.display()); break; } @@ -256,7 +265,7 @@ where // After successful validation, nothing is pending: any lists that // made it through validation will be in validated_lists. assert!(self.pending_lists.is_empty()); - self.pending_key_count = 0; + self.pending_key_count = 0; // why can't we just pending_lists.len() in all places we use pending_key_count tracing::debug!( "Validation complete, have {} validated lists", @@ -272,6 +281,12 @@ where // Drain `validated_lists` into the executor let mut executing_lists = Vec::new(); for mut list in self.validated_lists.drain(..) { + // again, this weird C++-esque &mut self drain function. + // I think a consuming `into_drain_paths()` would be more idiomatic. + // + // I see you need the drained list for the `cleanup_lists` call below. + // Make an `into_...()` function that returns a tuple, then. + // Less mutable state, more values = better. let objects = list.drain_paths(); self.tx .send(ExecutorMessage::Delete(objects)) @@ -288,6 +303,7 @@ where Ok(()) } + // better name: flush_to_executor_and_wait async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> { // Flush the executor, so that all the keys referenced by these deletion lists // are actually removed from remote storage. This is a precondition to deleting @@ -326,6 +342,7 @@ where match msg { BackendQueueMessage::Delete(list) => { if list.validated { + // this only happens during recovery self.validated_lists.push(list) } else { self.pending_key_count += list.len(); @@ -334,10 +351,14 @@ where if self.pending_key_count > AUTOFLUSH_KEY_COUNT { // Drop possible shutdown error, because we will just fall out of loop if that happens + // TODO: log the error at least? + // Alternatively, put a match here to ensure that it's really the shutdown error, + // and not some other error variant we might add in the future. drop(self.flush().await); } } BackendQueueMessage::Flush(op) => { + // would prefer an exhaustive match of the Ok() and each Err(...) variant here. if let Ok(()) = self.flush().await { // If we fail due to shutting down, we will just drop `op` to propagate that status. op.fire(); diff --git a/pageserver/src/deletion_queue/frontend.rs b/pageserver/src/deletion_queue/frontend.rs index b9abd1dc99..5e287bf55e 100644 --- a/pageserver/src/deletion_queue/frontend.rs +++ b/pageserver/src/deletion_queue/frontend.rs @@ -1,3 +1,11 @@ +//! The frontend batches deletion requests into DeletionLists and once batched, +//! passes them to the backend for validation & execution. +//! +//! Durability: the frontend persists the DeletionLists to disk, and the backend persists +//! the header file that keeps track of which deletion lists have been validated yet. +//! The split responsiblity is a bit headache-inducing, but, it allows us to persist +//! intention to delete in the frontend, even if the backend is down/full/slow. + use super::BackendQueueMessage; use super::DeletionHeader; use super::DeletionList; @@ -130,6 +138,8 @@ impl FrontendQueueWorker { f.fire(); } + // the .drain() is like a C++ move constructor; unidiomatics. + // Use std::mem::replace (together with the line below) instead. let onward_list = self.pending.drain(); // We have consumed out of pending: reset it for the next incoming deletions to accumulate there @@ -189,6 +199,8 @@ impl FrontendQueueWorker { } } + /// 1. There are no safeguards that this function isn't called more than once. + /// 2. Why isn't this part of the DeletionQueue::new() function? async fn recover( &mut self, attached_tenants: HashMap, @@ -204,6 +216,7 @@ impl FrontendQueueWorker { // Start our next deletion list from after the last location validated by // previous process lifetime, or after the last location found (it is updated // below after enumerating the deletion lists) + // isn't self.pending.sequence always 0 at this point? self.pending.sequence = std::cmp::max(self.pending.sequence, validated_sequence + 1); let deletion_directory = self.conf.deletion_prefix(); @@ -234,6 +247,9 @@ impl FrontendQueueWorker { let file_name = dentry.file_name().to_owned(); let basename = file_name.to_string_lossy(); let seq_part = if let Some(m) = list_name_pattern.captures(&basename) { + // named capture group would help readability here. + // Also, I see two capture groups, what is the second one for? version number, right? + // Would have been self-documenting through named capture groups. m.get(1) .expect("Non optional group should be present") .as_str() @@ -261,6 +277,9 @@ impl FrontendQueueWorker { } for s in seqs { + // why doesn't the header simply store the set of DeletionList names / sequence numbers? + // Then we could save ourselves the directory enumeration above, along with the + // regex parsing and cmp::max stuff. let list_path = self.conf.deletion_list_path(s); let list_bytes = tokio::fs::read(&list_path).await?; @@ -302,13 +321,14 @@ impl FrontendQueueWorker { // We will drop out of recovery if this fails: it indicates that we are shutting down // or the backend has panicked + // we already counted these before we crashed, mustn't count them again DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64); self.tx .send(BackendQueueMessage::Delete(deletion_list)) .await?; } - info!(next_sequence = self.pending.sequence, "Replay complete"); + info!(next_sequence = self.pending.sequence, "Recovery complete"); Ok(()) } @@ -383,6 +403,8 @@ impl FrontendQueueWorker { // Unexpected: after we flush, we should have // drained self.pending, so a conflict on // generation numbers should be impossible. + // Wondering whether we should have a counter metric that we bump each time we know we might be leaking objects. + // Better / easier / cheaper to monitor than scraping logs. tracing::error!( "Failed to enqueue deletions, leaking objects. This is a bug." ); @@ -413,6 +435,14 @@ impl FrontendQueueWorker { // This should only happen in truly unrecoverable cases, like the recovery finding that the backend // queue receiver has been dropped, or something is critically broken with // the local filesystem holding deletion lists. + // + // Hmm, so, if we fail to recover, we return from the frontend queue worker. + // This means subsequent submissions will fail, right? + // Which means each tenant's compaction loop affected by failed submission will enter the "retry in 2" seconds loop. + // => IMO we should move recovery to the DeletionQueue::new() constructor function, + // and if it fails, refuse to start the pageserver. Avoids a whole bunch of pain. + // If recovery fails because the on-disk state is garbage, can always `rm -rf` it and then + // restart the PS. info!( "Deletion queue recover aborted, deletion queue will not proceed ({e})" );