From 2f416267dce84aedff22a2d81b118f1721bd13ec Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 5 Jun 2025 02:41:56 +0200 Subject: [PATCH] finish implementing auto-quiescing; needs more tests --- libs/sk_ps_discovery/src/lib.rs | 94 ++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 7 deletions(-) diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 0fbf21ab24..2ac87f9f42 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -71,7 +71,46 @@ impl World { if !cfg!(debug_assertions) { return; } - // quiescing + + // caught_up_count maintenance + { + for (tenant_timeline_id, caught_up_count) in + self.caught_up_count.iter().map(|(k, v)| (*k, *v)) + { + let attachment_count = *self + .attachment_count + .get(&tenant_timeline_id.tenant_id) + .unwrap(); + assert!(caught_up_count <= attachment_count); + if caught_up_count == attachment_count { + self.quiesced_timelines.contains_key(&tenant_timeline_id); + // remote_consistent_lsn and commit_lsns is empty, checked by "quiescing XOR ..." below + } else { + let commit_lsn = self.commit_lsns[&&tenant_timeline_id]; + let mut validate_caught_up = 0; + let mut validate_not_caught_up = 0; + for (_, r_c_lsn) in self + .remote_consistent_lsns + .range(TimelineAttachmentId::timeline_range(tenant_timeline_id)) + .map(|(k, v)| (*k, *v)) + { + if r_c_lsn == commit_lsn { + validate_caught_up += 1; + } else { + assert!(r_c_lsn < commit_lsn); + validate_not_caught_up += 1; + } + } + assert_eq!(validate_caught_up, caught_up_count); + assert_eq!( + validate_caught_up + validate_not_caught_up, + attachment_count + ); + } + } + } + + // quiescing XOR ... { let quiesced_timelines: HashSet = self.quiesced_timelines.keys().cloned().collect(); @@ -82,7 +121,6 @@ impl World { .keys() .map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id) .collect(); - // quiesced \cap (commit_lsn \cup remote_consistent_lsns) #[rustfmt::skip] assert_eq!(0, quiesced_timelines.intersection(&commit_lsn_timelines).count()); #[rustfmt::skip] @@ -174,7 +212,7 @@ impl World { } } } - btree_map::Entry::Vacant(entry) => { + btree_map::Entry::Vacant(_) => { let ttid = attachment.tenant_timeline_id; match self.quiesced_timelines.get(&ttid).cloned() { Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => { @@ -186,8 +224,7 @@ impl World { self.handle_remote_consistent_lsn_advertisement(adv); } None => { - info!("first time hearing about timeline attachment"); - entry.insert(remote_consistent_lsn); + info!("ignoring advertisement because timeline is not known"); } } } @@ -232,8 +269,22 @@ impl World { self.handle_commit_lsn_advancement(ttid, update); } None => { - info!("first time hearing about this commit_lsn"); + info!("first time hearing about this timeline, initializing"); entry.insert(update); + let replaced = self.caught_up_count.insert(ttid, 0); + // only commit_lsn advancement makes timelines known to world + assert_eq!(None, replaced); + for (attachment, _) in self + .attachments + .range(TenantShardAttachmentId::tenant_range(ttid.tenant_id)) + { + let replaced = self.remote_consistent_lsns.insert( + attachment.timeline_attachment_id(ttid.timeline_id), + Lsn(0), + ); + // only commit_lsn advancement makes timelines known to World + assert_eq!(None, replaced); + } } } } @@ -318,10 +369,39 @@ impl World { } fn quiesce_timeline(&mut self, tenant_timeline_id: TenantTimelineId) { + self.check_invariants(); if self.quiesced_timelines.contains_key(&tenant_timeline_id) { panic!("only call this function on active timelines"); } - todo!(); + let quiesced_lsn = self + .commit_lsns + .remove(&tenant_timeline_id) + .expect("inconsistent: we checked it's not in quiesced_timelines, so, must be active"); + let caught_up_count = self + .caught_up_count + .remove(&tenant_timeline_id) + .expect("inconsistent: we checked it's not in quiesced_timleines, so, must be active"); + let mut remove_remote_consistent_lsns = Vec::new(); + for (k, remote_consistent_lsn) in self + .remote_consistent_lsns + .range(TimelineAttachmentId::timeline_range(tenant_timeline_id)) + { + assert_eq!(*remote_consistent_lsn, quiesced_lsn); + remove_remote_consistent_lsns.push(*k); + } + assert_eq!( + caught_up_count, + u16::try_from(remove_remote_consistent_lsns.len()).unwrap() + ); + for k in remove_remote_consistent_lsns { + let removed = self.remote_consistent_lsns.remove(&k); + assert!(removed.is_some(), "we just added"); + } + let replaced = self + .quiesced_timelines + .insert(tenant_timeline_id, quiesced_lsn); + assert_eq!(None, replaced); // we checked at function entry + self.check_invariants(); } }