This commit is contained in:
Christian Schwarz
2025-06-04 22:34:01 +02:00
parent ba6abe203d
commit c689110ad6

View File

@@ -1,8 +1,7 @@
#[cfg(test)]
mod tests;
use std::collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use tracing::{info, warn};
use utils::{
@@ -51,7 +50,27 @@ pub struct RemoteConsistentLsnAdv {
}
impl World {
fn check_invariants(&self) {
// quiescing
{
let quiesced_timelines: HashSet<TenantTimelineId> =
self.quiesced_timelines.keys().cloned().collect();
let commit_lsn_timelines: HashSet<TenantTimelineId> =
self.commit_lsns.keys().cloned().collect();
let remote_consistent_lsn_timelines: HashSet<TenantTimelineId> = self
.remote_consistent_lsns
.keys()
.map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id())
.collect();
// quiesced \cap (commit_lsn \cup remote_consistent_lsns)
#[rustfmt::skip]
assert_eq!(0, quiesced_timelines.intersection(&commit_lsn_timelines).count());
#[rustfmt::skip]
assert_eq!(0, quiesced_timelines.intersection(&remote_consistent_lsn_timelines).count());
}
}
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
self.check_invariants();
use AttachmentUpdateAction::*;
use hash_map::Entry::*;
let AttachmentUpdate {
@@ -68,23 +87,15 @@ impl World {
(Attach { ps_id }, Vacant(e)) => {
e.insert(ps_id);
// New shards may start at an older LSN than where we quiesced => activate all quiesced timelines.
let activate_range = TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
let activate: HashSet<TimelineId> = self.quiesced_timelines.range(activate_range).map(|(ttid, _quiesced_lsn)| ttid.timeline_id).collect();
for timeline_id in activate {
let quiesced_lsn = self.quiesced_timelines.remove(&tenant_timeline_id).expect("we just saw it in the .range()");
for attachment in self.attachments.iter()
let timeline_attachment_id = TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id,
};
match self.remote_consistent_lsns.entry(timeline_attachment_id) {
Occupied(entry) => {
panic!("inconsistency; did an activation from remote_consistent_lsn adv handling not clean up quiesced_timelines?");
},
Vacant(entry) => {
entry.insert(quiesced_lsn);
},
}
let activate_range =
TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
let activate: HashSet<TenantTimelineId> = self
.quiesced_timelines
.range(activate_range)
.map(|(ttid, _quiesced_lsn)| *ttid)
.collect();
for tenant_timeline_id in activate {
self.activate_timeline(tenant_timeline_id);
}
}
(Detach, Occupied(e)) => {
@@ -94,11 +105,10 @@ impl World {
info!("detachment is already known");
}
}
self.check_invariants();
}
pub fn handle_remote_consistent_lsn_advertisement(
&mut self,
adv: RemoteConsistentLsnAdv,
) {
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
self.check_invariants();
let RemoteConsistentLsnAdv {
attachment,
remote_consistent_lsn,
@@ -111,59 +121,55 @@ impl World {
warn!(
"ignoring advertisement because remote_consistent_lsn is moving backwards"
);
return;
} else {
*current = remote_consistent_lsn;
}
*current = remote_consistent_lsn;
}
hash_map::Entry::Vacant(vacant_attachment_entry) => {
match (self.quiesced_timelines.entry(attachment.tenant_timeline_id()), remote_consistent_lsn) {
(btree_map::Entry::Occupied(entry), remote_consistent_lsn) if *entry.get() == remote_consistent_lsn => {
let ttid = attachment.tenant_timeline_id();
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => {
info!("ignoring no-op update for quiesced timeline");
}
(btree_map::Entry::Occupied(entry), remote_consistent_lsn) => {
info!("update for quiesced timeline -> activating");
let quiesced_lsn = entry.remove();
let reconstruct_remote_consistent_lsn_entries = self.attachments.keys().map(|tenant_shard_attachment_id| {
(TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id: attachment.timeline_id,
}, quiesced_lsn)});
self.remote_consistent_lsns.reserve(reconstruct_remote_consistent_lsn_entries.len());
for (key, value) in reconstruct_remote_consistent_lsn_entries {
self.remote_consistent_lsns.insert(key, value);
}
Some(_) => {
self.activate_timeline(ttid);
// recurse one level, guarnateed to hit `Occupied` case above
self.handle_remote_consistent_lsn_advertisement(adv);
}
(hash_map::Entry::Vacant(entry), remote_consistent_lsn) => {
None => {
info!("first time hearing about timeline attachment");
entry.insert(remote_consistent_lsn);
vacant_attachment_entry.insert(remote_consistent_lsn);
}
}
}
}
self.check_invariants();
}
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) {
self.check_invariants();
match self.commit_lsns.entry(ttid) {
hash_map::Entry::Occupied(mut occupied_entry) => {
assert!(*occupied_entry.get() <= commit_lsn);
*occupied_entry.get_mut() = commit_lsn;
}
hash_map::Entry::Vacant(vacant_commit_lsns_entry) => {
match (self.quiesced_timelines.entry(ttid), commit_lsn) {
(btree_map::Entry::Occupied(entry), commit_lsn) if *entry.get() == commit_lsn => {
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == commit_lsn => {
info!("ignoring no-op update for quiesced timeline");
},
(btree_map::Entry::Occupied(entry), commit_lsn) => {
info!("update for quiesced timeline -> activating");
let quiesced_lsn = entry.remove();
vacant_commit_lsns_entry.insert(quiesced_lsn);
},
(btree_map::Entry::Vacant(entry), commit_lsn) => {
info!("first time hearing about commit_lsn for this timeline");
entry.insert(commit_lsn);
}
Some(_) => {
self.activate_timeline(ttid);
// recurse one level, guarnateed to hit `Occupied` case above
self.handle_commit_lsn_advancement(ttid, commit_lsn);
}
None => {
info!("first time hearing about this commit_lsn");
vacant_commit_lsns_entry.insert(commit_lsn);
}
}
}
}
self.check_invariants();
}
pub fn get_commit_lsn_advertisements(&self) -> HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> {
@@ -196,6 +202,34 @@ impl World {
}
commit_lsn_advertisements_by_node
}
fn activate_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
let quiesced_lsn = self
.quiesced_timelines
.remove(&tenant_timeline_id)
.expect("must call this function only on quiesced tenant_timeline_id");
let replaced = self.commit_lsns.insert(tenant_timeline_id, quiesced_lsn);
assert_eq!(None, replaced);
let reconstruct_remote_consistent_lsn_entries =
self.attachments
.keys()
.cloned()
.map(|tenant_shard_attachment_id| {
(
TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id: tenant_timeline_id.timeline_id,
},
quiesced_lsn,
)
});
self.remote_consistent_lsns
.reserve(reconstruct_remote_consistent_lsn_entries.len());
for (key, value) in reconstruct_remote_consistent_lsn_entries {
let replaced = self.remote_consistent_lsns.insert(key, value);
assert_eq!(None, replaced);
}
}
}
impl TimelineAttachmentId {