diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index e47d3de947..62e82198bc 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -1,8 +1,7 @@ #[cfg(test)] mod tests; - -use std::collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; use tracing::{info, warn}; use utils::{ @@ -51,7 +50,27 @@ pub struct RemoteConsistentLsnAdv { } impl World { + fn check_invariants(&self) { + // quiescing + { + let quiesced_timelines: HashSet = + self.quiesced_timelines.keys().cloned().collect(); + let commit_lsn_timelines: HashSet = + self.commit_lsns.keys().cloned().collect(); + let remote_consistent_lsn_timelines: HashSet = self + .remote_consistent_lsns + .keys() + .map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id()) + .collect(); + // quiesced \cap (commit_lsn \cup remote_consistent_lsns) + #[rustfmt::skip] + assert_eq!(0, quiesced_timelines.intersection(&commit_lsn_timelines).count()); + #[rustfmt::skip] + assert_eq!(0, quiesced_timelines.intersection(&remote_consistent_lsn_timelines).count()); + } + } pub fn update_attachment(&mut self, upd: AttachmentUpdate) { + self.check_invariants(); use AttachmentUpdateAction::*; use hash_map::Entry::*; let AttachmentUpdate { @@ -68,23 +87,15 @@ impl World { (Attach { ps_id }, Vacant(e)) => { e.insert(ps_id); // New shards may start at an older LSN than where we quiesced => activate all quiesced timelines. - let activate_range = TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id); - let activate: HashSet = self.quiesced_timelines.range(activate_range).map(|(ttid, _quiesced_lsn)| ttid.timeline_id).collect(); - for timeline_id in activate { - let quiesced_lsn = self.quiesced_timelines.remove(&tenant_timeline_id).expect("we just saw it in the .range()"); - for attachment in self.attachments.iter() - let timeline_attachment_id = TimelineAttachmentId { - tenant_shard_attachment_id, - timeline_id, - }; - match self.remote_consistent_lsns.entry(timeline_attachment_id) { - Occupied(entry) => { - panic!("inconsistency; did an activation from remote_consistent_lsn adv handling not clean up quiesced_timelines?"); - }, - Vacant(entry) => { - entry.insert(quiesced_lsn); - }, - } + let activate_range = + TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id); + let activate: HashSet = self + .quiesced_timelines + .range(activate_range) + .map(|(ttid, _quiesced_lsn)| *ttid) + .collect(); + for tenant_timeline_id in activate { + self.activate_timeline(tenant_timeline_id); } } (Detach, Occupied(e)) => { @@ -94,11 +105,10 @@ impl World { info!("detachment is already known"); } } + self.check_invariants(); } - pub fn handle_remote_consistent_lsn_advertisement( - &mut self, - adv: RemoteConsistentLsnAdv, - ) { + pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) { + self.check_invariants(); let RemoteConsistentLsnAdv { attachment, remote_consistent_lsn, @@ -111,59 +121,55 @@ impl World { warn!( "ignoring advertisement because remote_consistent_lsn is moving backwards" ); - return; + } else { + *current = remote_consistent_lsn; } - *current = remote_consistent_lsn; } hash_map::Entry::Vacant(vacant_attachment_entry) => { - match (self.quiesced_timelines.entry(attachment.tenant_timeline_id()), remote_consistent_lsn) { - (btree_map::Entry::Occupied(entry), remote_consistent_lsn) if *entry.get() == remote_consistent_lsn => { + let ttid = attachment.tenant_timeline_id(); + match self.quiesced_timelines.get(&ttid).cloned() { + Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => { info!("ignoring no-op update for quiesced timeline"); } - (btree_map::Entry::Occupied(entry), remote_consistent_lsn) => { - info!("update for quiesced timeline -> activating"); - let quiesced_lsn = entry.remove(); - let reconstruct_remote_consistent_lsn_entries = self.attachments.keys().map(|tenant_shard_attachment_id| { - (TimelineAttachmentId { - tenant_shard_attachment_id, - timeline_id: attachment.timeline_id, - }, quiesced_lsn)}); - self.remote_consistent_lsns.reserve(reconstruct_remote_consistent_lsn_entries.len()); - for (key, value) in reconstruct_remote_consistent_lsn_entries { - self.remote_consistent_lsns.insert(key, value); - } + Some(_) => { + self.activate_timeline(ttid); + // recurse one level, guarnateed to hit `Occupied` case above + self.handle_remote_consistent_lsn_advertisement(adv); } - (hash_map::Entry::Vacant(entry), remote_consistent_lsn) => { + None => { info!("first time hearing about timeline attachment"); - entry.insert(remote_consistent_lsn); + vacant_attachment_entry.insert(remote_consistent_lsn); } } } } + self.check_invariants(); } pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) { + self.check_invariants(); match self.commit_lsns.entry(ttid) { hash_map::Entry::Occupied(mut occupied_entry) => { assert!(*occupied_entry.get() <= commit_lsn); *occupied_entry.get_mut() = commit_lsn; } hash_map::Entry::Vacant(vacant_commit_lsns_entry) => { - match (self.quiesced_timelines.entry(ttid), commit_lsn) { - (btree_map::Entry::Occupied(entry), commit_lsn) if *entry.get() == commit_lsn => { + match self.quiesced_timelines.get(&ttid).cloned() { + Some(quiesced_lsn) if quiesced_lsn == commit_lsn => { info!("ignoring no-op update for quiesced timeline"); - }, - (btree_map::Entry::Occupied(entry), commit_lsn) => { - info!("update for quiesced timeline -> activating"); - let quiesced_lsn = entry.remove(); - vacant_commit_lsns_entry.insert(quiesced_lsn); - }, - (btree_map::Entry::Vacant(entry), commit_lsn) => { - info!("first time hearing about commit_lsn for this timeline"); - entry.insert(commit_lsn); + } + Some(_) => { + self.activate_timeline(ttid); + // recurse one level, guarnateed to hit `Occupied` case above + self.handle_commit_lsn_advancement(ttid, commit_lsn); + } + None => { + info!("first time hearing about this commit_lsn"); + vacant_commit_lsns_entry.insert(commit_lsn); } } } } + self.check_invariants(); } pub fn get_commit_lsn_advertisements(&self) -> HashMap> { @@ -196,6 +202,34 @@ impl World { } commit_lsn_advertisements_by_node } + + fn activate_timeline(&mut self, tenant_timeline_id: TenantTimelineId) { + let quiesced_lsn = self + .quiesced_timelines + .remove(&tenant_timeline_id) + .expect("must call this function only on quiesced tenant_timeline_id"); + let replaced = self.commit_lsns.insert(tenant_timeline_id, quiesced_lsn); + assert_eq!(None, replaced); + let reconstruct_remote_consistent_lsn_entries = + self.attachments + .keys() + .cloned() + .map(|tenant_shard_attachment_id| { + ( + TimelineAttachmentId { + tenant_shard_attachment_id, + timeline_id: tenant_timeline_id.timeline_id, + }, + quiesced_lsn, + ) + }); + self.remote_consistent_lsns + .reserve(reconstruct_remote_consistent_lsn_entries.len()); + for (key, value) in reconstruct_remote_consistent_lsn_entries { + let replaced = self.remote_consistent_lsns.insert(key, value); + assert_eq!(None, replaced); + } + } } impl TimelineAttachmentId {