From fc9f38dd2d9727836bb6328258c53b97fdd021bb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 4 Jun 2025 22:48:21 +0200 Subject: [PATCH] continue --- libs/sk_ps_discovery/src/lib.rs | 55 +++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 62e82198bc..6868b3ce96 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -117,15 +117,22 @@ impl World { match self.remote_consistent_lsns.entry(attachment) { hash_map::Entry::Occupied(mut occupied_entry) => { let current = occupied_entry.get_mut(); - if !(*current <= remote_consistent_lsn) { - warn!( - "ignoring advertisement because remote_consistent_lsn is moving backwards" - ); - } else { - *current = remote_consistent_lsn; + use std::cmp::Ordering::*; + match (*current).cmp(&remote_consistent_lsn) { + Less => { + *current = remote_consistent_lsn; + } + Equal => { + info!("ignoring no-op update, likely duplicate delivery"); + } + Greater => { + warn!( + "ignoring advertisement because remote_consistent_lsn is moving backwards" + ); + } } } - hash_map::Entry::Vacant(vacant_attachment_entry) => { + hash_map::Entry::Vacant(entry) => { let ttid = attachment.tenant_timeline_id(); match self.quiesced_timelines.get(&ttid).cloned() { Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => { @@ -138,33 +145,49 @@ impl World { } None => { info!("first time hearing about timeline attachment"); - vacant_attachment_entry.insert(remote_consistent_lsn); + entry.insert(remote_consistent_lsn); } } } } self.check_invariants(); } - pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) { + pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, update: Lsn) { self.check_invariants(); match self.commit_lsns.entry(ttid) { - hash_map::Entry::Occupied(mut occupied_entry) => { - assert!(*occupied_entry.get() <= commit_lsn); - *occupied_entry.get_mut() = commit_lsn; + hash_map::Entry::Occupied(mut entry) => { + let current = entry.get_mut(); + use std::cmp::Ordering::*; + match (*current).cmp(&update) { + Less => { + *current = update; + } + Equal => { + // This code runs in safekeeper impl, no reason why there would be duplicate delivery. + warn!("ignoring no-op update; why is this happening?"); + } + Greater => { + panic!( + "proposed commit_lsn would move it backwards: current={} update={}", + current, update + ); + } + } } - hash_map::Entry::Vacant(vacant_commit_lsns_entry) => { + + hash_map::Entry::Vacant(entry) => { match self.quiesced_timelines.get(&ttid).cloned() { - Some(quiesced_lsn) if quiesced_lsn == commit_lsn => { + Some(quiesced_lsn) if quiesced_lsn == update => { info!("ignoring no-op update for quiesced timeline"); } Some(_) => { self.activate_timeline(ttid); // recurse one level, guarnateed to hit `Occupied` case above - self.handle_commit_lsn_advancement(ttid, commit_lsn); + self.handle_commit_lsn_advancement(ttid, update); } None => { info!("first time hearing about this commit_lsn"); - vacant_commit_lsns_entry.insert(commit_lsn); + entry.insert(update); } } }