finish implementing auto-quiescing; needs more tests

This commit is contained in:
Christian Schwarz
2025-06-05 02:41:56 +02:00
parent d52d560f16
commit 2f416267dc

View File

@@ -71,7 +71,46 @@ impl World {
if !cfg!(debug_assertions) {
return;
}
// quiescing
// caught_up_count maintenance
{
for (tenant_timeline_id, caught_up_count) in
self.caught_up_count.iter().map(|(k, v)| (*k, *v))
{
let attachment_count = *self
.attachment_count
.get(&tenant_timeline_id.tenant_id)
.unwrap();
assert!(caught_up_count <= attachment_count);
if caught_up_count == attachment_count {
self.quiesced_timelines.contains_key(&tenant_timeline_id);
// remote_consistent_lsn and commit_lsns is empty, checked by "quiescing XOR ..." below
} else {
let commit_lsn = self.commit_lsns[&&tenant_timeline_id];
let mut validate_caught_up = 0;
let mut validate_not_caught_up = 0;
for (_, r_c_lsn) in self
.remote_consistent_lsns
.range(TimelineAttachmentId::timeline_range(tenant_timeline_id))
.map(|(k, v)| (*k, *v))
{
if r_c_lsn == commit_lsn {
validate_caught_up += 1;
} else {
assert!(r_c_lsn < commit_lsn);
validate_not_caught_up += 1;
}
}
assert_eq!(validate_caught_up, caught_up_count);
assert_eq!(
validate_caught_up + validate_not_caught_up,
attachment_count
);
}
}
}
// quiescing XOR ...
{
let quiesced_timelines: HashSet<TenantTimelineId> =
self.quiesced_timelines.keys().cloned().collect();
@@ -82,7 +121,6 @@ impl World {
.keys()
.map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id)
.collect();
// quiesced \cap (commit_lsn \cup remote_consistent_lsns)
#[rustfmt::skip]
assert_eq!(0, quiesced_timelines.intersection(&commit_lsn_timelines).count());
#[rustfmt::skip]
@@ -174,7 +212,7 @@ impl World {
}
}
}
btree_map::Entry::Vacant(entry) => {
btree_map::Entry::Vacant(_) => {
let ttid = attachment.tenant_timeline_id;
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => {
@@ -186,8 +224,7 @@ impl World {
self.handle_remote_consistent_lsn_advertisement(adv);
}
None => {
info!("first time hearing about timeline attachment");
entry.insert(remote_consistent_lsn);
info!("ignoring advertisement because timeline is not known");
}
}
}
@@ -232,8 +269,22 @@ impl World {
self.handle_commit_lsn_advancement(ttid, update);
}
None => {
info!("first time hearing about this commit_lsn");
info!("first time hearing about this timeline, initializing");
entry.insert(update);
let replaced = self.caught_up_count.insert(ttid, 0);
// only commit_lsn advancement makes timelines known to world
assert_eq!(None, replaced);
for (attachment, _) in self
.attachments
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
{
let replaced = self.remote_consistent_lsns.insert(
attachment.timeline_attachment_id(ttid.timeline_id),
Lsn(0),
);
// only commit_lsn advancement makes timelines known to World
assert_eq!(None, replaced);
}
}
}
}
@@ -318,10 +369,39 @@ impl World {
}
fn quiesce_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
self.check_invariants();
if self.quiesced_timelines.contains_key(&tenant_timeline_id) {
panic!("only call this function on active timelines");
}
todo!();
let quiesced_lsn = self
.commit_lsns
.remove(&tenant_timeline_id)
.expect("inconsistent: we checked it's not in quiesced_timelines, so, must be active");
let caught_up_count = self
.caught_up_count
.remove(&tenant_timeline_id)
.expect("inconsistent: we checked it's not in quiesced_timleines, so, must be active");
let mut remove_remote_consistent_lsns = Vec::new();
for (k, remote_consistent_lsn) in self
.remote_consistent_lsns
.range(TimelineAttachmentId::timeline_range(tenant_timeline_id))
{
assert_eq!(*remote_consistent_lsn, quiesced_lsn);
remove_remote_consistent_lsns.push(*k);
}
assert_eq!(
caught_up_count,
u16::try_from(remove_remote_consistent_lsns.len()).unwrap()
);
for k in remove_remote_consistent_lsns {
let removed = self.remote_consistent_lsns.remove(&k);
assert!(removed.is_some(), "we just added");
}
let replaced = self
.quiesced_timelines
.insert(tenant_timeline_id, quiesced_lsn);
assert_eq!(None, replaced); // we checked at function entry
self.check_invariants();
}
}