From 10e927ee3ea5a514850456b7c00ce9a4fd4f1480 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 16 Aug 2023 17:21:10 +0100 Subject: [PATCH] Add encoding versions to deletion queue structs --- pageserver/src/deletion_queue.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 9dadd264cf..bddf5cd805 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -110,6 +110,9 @@ pub struct DeletionQueueClient { #[serde_as] #[derive(Debug, Serialize, Deserialize)] struct DeletionList { + /// Serialization version, for future use + version: u8, + /// Used for constructing a unique key for each deletion list we write out. sequence: u64, @@ -121,6 +124,9 @@ struct DeletionList { #[serde_as] #[derive(Debug, Serialize, Deserialize)] struct DeletionHeader { + /// Serialization version, for future use + version: u8, + /// Enable determining the next sequence number even if there are no deletion lists present. /// If there _are_ deletion lists present, then their sequence numbers take precedence over /// this. @@ -130,9 +136,22 @@ struct DeletionHeader { // and are OK to execute. } +impl DeletionHeader { + const VERSION_LATEST: u8 = 1; + + fn new(last_deleted_list_seq: u64) -> Self { + Self { + version: Self::VERSION_LATEST, + last_deleted_list_seq, + } + } +} + impl DeletionList { + const VERSION_LATEST: u8 = 1; fn new(sequence: u64) -> Self { Self { + version: Self::VERSION_LATEST, sequence, objects: Vec::new(), } @@ -496,12 +515,11 @@ impl FrontendQueueWorker { f.fire(); } - let mut onward_list = DeletionList { - sequence: self.pending.sequence, - objects: Vec::new(), - }; + let mut onward_list = DeletionList::new(self.pending.sequence); std::mem::swap(&mut onward_list.objects, &mut self.pending.objects); - self.pending.sequence += 1; + + // We have consumed out of pending: reset it for the next incoming deletions to accumulate there + self.pending = DeletionList::new(self.pending.sequence + 1); if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await { // This is allowed to fail: it will only happen if the backend worker is shut down,