WIP promising quiescing mechanism

This commit is contained in:
Christian Schwarz
2025-06-04 22:07:48 +02:00
parent d16e024d49
commit ba6abe203d
3 changed files with 78 additions and 133 deletions

View File

@@ -1,9 +1,8 @@
#[cfg(test)]
mod tests;
mod storage;
use std::collections::{HashMap, HashSet, hash_map};
use std::collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet};
use tracing::{info, warn};
use utils::{
@@ -16,10 +15,11 @@ use utils::{
#[derive(Debug, Default)]
pub struct World {
attachments: HashMap<TenantShardAttachmentId, NodeId>,
quiesced_timelines: BTreeMap<TenantTimelineId, Lsn>,
commit_lsns: HashMap<TenantTimelineId, Lsn>,
remote_consistent_lsns: HashMap<TimelineAttachmentId, Lsn>,
offloaded_timelines: HashMap<TenantTimelineId, OffloadState>,
storage: Box<dyn storage::Storage>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
@@ -50,26 +50,6 @@ pub struct RemoteConsistentLsnAdv {
pub remote_consistent_lsn: Lsn,
}
pub struct WaitForStorage {
}
pub struct EffectCompletionUnoffloadTimeline {
pub tenant_timeline_id: TenantTimelineId,
pub loaded: storage::Timeline,
}
pub struct EffectCompletionOffloadTimeline {
pub tenant_timeline_id: TenantTimelineId,
}
enum OffloadState {
Unoffloading(storage::Waiter),
Offloading(storage::Waiter),
Offloaded,
}
impl World {
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
use AttachmentUpdateAction::*;
@@ -87,6 +67,25 @@ impl World {
}
(Attach { ps_id }, Vacant(e)) => {
e.insert(ps_id);
// New shards may start at an older LSN than where we quiesced => activate all quiesced timelines.
let activate_range = TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
let activate: HashSet<TimelineId> = self.quiesced_timelines.range(activate_range).map(|(ttid, _quiesced_lsn)| ttid.timeline_id).collect();
for timeline_id in activate {
let quiesced_lsn = self.quiesced_timelines.remove(&tenant_timeline_id).expect("we just saw it in the .range()");
for attachment in self.attachments.iter()
let timeline_attachment_id = TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id,
};
match self.remote_consistent_lsns.entry(timeline_attachment_id) {
Occupied(entry) => {
panic!("inconsistency; did an activation from remote_consistent_lsn adv handling not clean up quiesced_timelines?");
},
Vacant(entry) => {
entry.insert(quiesced_lsn);
},
}
}
}
(Detach, Occupied(e)) => {
e.remove();
@@ -99,12 +98,12 @@ impl World {
pub fn handle_remote_consistent_lsn_advertisement(
&mut self,
adv: RemoteConsistentLsnAdv,
) -> Result<(), storage::Waiter> {
) {
let RemoteConsistentLsnAdv {
attachment,
remote_consistent_lsn,
} = adv;
self.unoffload_timeline(attachment.tenant_timeline_id())?;
match self.remote_consistent_lsns.entry(attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => {
let current = occupied_entry.get_mut();
@@ -112,16 +111,35 @@ impl World {
warn!(
"ignoring advertisement because remote_consistent_lsn is moving backwards"
);
return Ok(());
return;
}
*current = remote_consistent_lsn;
}
hash_map::Entry::Vacant(vacant_entry) => {
info!("first time hearing from timeline attachment");
vacant_entry.insert(remote_consistent_lsn);
hash_map::Entry::Vacant(vacant_attachment_entry) => {
match (self.quiesced_timelines.entry(attachment.tenant_timeline_id()), remote_consistent_lsn) {
(btree_map::Entry::Occupied(entry), remote_consistent_lsn) if *entry.get() == remote_consistent_lsn => {
info!("ignoring no-op update for quiesced timeline");
}
(btree_map::Entry::Occupied(entry), remote_consistent_lsn) => {
info!("update for quiesced timeline -> activating");
let quiesced_lsn = entry.remove();
let reconstruct_remote_consistent_lsn_entries = self.attachments.keys().map(|tenant_shard_attachment_id| {
(TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id: attachment.timeline_id,
}, quiesced_lsn)});
self.remote_consistent_lsns.reserve(reconstruct_remote_consistent_lsn_entries.len());
for (key, value) in reconstruct_remote_consistent_lsn_entries {
self.remote_consistent_lsns.insert(key, value);
}
}
(hash_map::Entry::Vacant(entry), remote_consistent_lsn) => {
info!("first time hearing about timeline attachment");
entry.insert(remote_consistent_lsn);
}
}
}
}
Ok(())
}
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) {
match self.commit_lsns.entry(ttid) {
@@ -129,9 +147,21 @@ impl World {
assert!(*occupied_entry.get() <= commit_lsn);
*occupied_entry.get_mut() = commit_lsn;
}
hash_map::Entry::Vacant(vacant_entry) => {
info!("first time learning about sk timeline");
vacant_entry.insert(commit_lsn);
hash_map::Entry::Vacant(vacant_commit_lsns_entry) => {
match (self.quiesced_timelines.entry(ttid), commit_lsn) {
(btree_map::Entry::Occupied(entry), commit_lsn) if *entry.get() == commit_lsn => {
info!("ignoring no-op update for quiesced timeline");
},
(btree_map::Entry::Occupied(entry), commit_lsn) => {
info!("update for quiesced timeline -> activating");
let quiesced_lsn = entry.remove();
vacant_commit_lsns_entry.insert(quiesced_lsn);
},
(btree_map::Entry::Vacant(entry), commit_lsn) => {
info!("first time hearing about commit_lsn for this timeline");
entry.insert(commit_lsn);
}
}
}
}
}
@@ -166,86 +196,6 @@ impl World {
}
commit_lsn_advertisements_by_node
}
fn unoffload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), WaitForStorage> {
let Some(state) = self.offloaded_timelines.get_mut(&ttid) else {
return Ok(());
};
match state {
OffloadState::Unoffloading(waiter) |
OffloadState::Offloading(waiter) => waiter.clone(),
OffloadState::Offloaded => {
*state = &mut OffloadState::Unoffloading();
},
}
Err(Effect::UnoffloadTimeline {
tenant_timeline_id: ttid,
})
}
pub fn complete_unoffload_timeline(&mut self, completion: EffectCompletionUnoffloadTimeline) {
let EffectCompletionUnoffloadTimeline {
tenant_timeline_id,
loaded: storage,
} = completion;
if !self.offloaded_timelines.remove(&tenant_timeline_id) {
return;
}
for (tenant_timeline_id, commit_lsn) in commit_lsns {
match self.commit_lsns.entry(tenant_timeline_id) {
hash_map::Entry::Occupied(occupied_entry) => {
panic!(
"inconsistent: entry is supposed to be paged_out:\ninmem={:?}\nondisk={:?}",
occupied_entry.get(),
(tenant_timeline_id, commit_lsn)
)
}
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(commit_lsn);
}
}
}
for (timeline_attachment_id, remote_consistent_lsn) in remote_consistent_lsns {
match self.remote_consistent_lsns.entry(timeline_attachment_id) {
hash_map::Entry::Occupied(occupied_entry) => {
panic!(
"inconsistent: entry is supposed to be paged_out:\ninmem={:?}\nondisk={:?}",
occupied_entry.get(),
(timeline_attachment_id, remote_consistent_lsn)
)
}
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(remote_consistent_lsn);
}
}
}
}
pub fn offload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), Effect> {
match self.offloaded_timelines.entry(ttid) {
hash_map::Entry::Occupied(occupied_entry) => {
match occupied_entry.get() {
OffloadState::Unoffloading |
OffloadState::Offloading =>
OffloadState::Offloaded => todo!(),
}
},
hash_map::Entry::Vacant(vacant_entry) => todo!(),
}
Err(Effect::OffloadTimeline {
tenant_timeline_id: ttid,
})
}
pub fn complete_offload_timeline(&mut self, completion: EffectCompletionOffloadTimeline) {
let EffectCompletionOffloadTimeline { tenant_timeline_id } = completion;
match self.offloaded_timelines.entry(tenant_timeline_id) {
hash_map::Entry::Occupied(occupied_entry) => {
occupied_entry.
},
hash_map::Entry::Vacant(vacant_entry) => todo!(),
}
}
}
impl TimelineAttachmentId {

View File

@@ -1,19 +0,0 @@
use std::collections::HashMap;
use utils::id::TenantTimelineId;
use crate::TimelineAttachmentId;
pub trait Storage {
fn get_timeline(&self, ttid: TenantTimelineId) -> Timeline;
fn store_timeline(&self, ttid: TenantTimelineId, timeline: Timeline);
}
#[derive(Clone)]
pub struct Waiter {
}
pub struct Timeline {
pub remote_consistent_lsns: HashMap<TimelineAttachmentId, Lsn>,
}

View File

@@ -1,5 +1,6 @@
use std::fmt;
use std::num::ParseIntError;
use std::ops::RangeInclusive;
use std::str::FromStr;
use anyhow::Context;
@@ -320,6 +321,19 @@ impl TenantTimelineId {
pub fn empty() -> Self {
Self::new(TenantId::from([0u8; 16]), TimelineId::from([0u8; 16]))
}
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
timeline_id: TimelineId::from_array([u8::MIN; 16]),
},
Self {
tenant_id,
timeline_id: TimelineId::from_array([u8::MAX; 16]),
},
)
}
}
impl fmt::Display for TenantTimelineId {