diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 622239638b..e47d3de947 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -1,9 +1,8 @@ #[cfg(test)] mod tests; -mod storage; -use std::collections::{HashMap, HashSet, hash_map}; +use std::collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}; use tracing::{info, warn}; use utils::{ @@ -16,10 +15,11 @@ use utils::{ #[derive(Debug, Default)] pub struct World { attachments: HashMap, + + quiesced_timelines: BTreeMap, + commit_lsns: HashMap, remote_consistent_lsns: HashMap, - offloaded_timelines: HashMap, - storage: Box, } #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] @@ -50,26 +50,6 @@ pub struct RemoteConsistentLsnAdv { pub remote_consistent_lsn: Lsn, } -pub struct WaitForStorage { - -} - - -pub struct EffectCompletionUnoffloadTimeline { - pub tenant_timeline_id: TenantTimelineId, - pub loaded: storage::Timeline, -} - -pub struct EffectCompletionOffloadTimeline { - pub tenant_timeline_id: TenantTimelineId, -} - -enum OffloadState { - Unoffloading(storage::Waiter), - Offloading(storage::Waiter), - Offloaded, -} - impl World { pub fn update_attachment(&mut self, upd: AttachmentUpdate) { use AttachmentUpdateAction::*; @@ -87,6 +67,25 @@ impl World { } (Attach { ps_id }, Vacant(e)) => { e.insert(ps_id); + // New shards may start at an older LSN than where we quiesced => activate all quiesced timelines. + let activate_range = TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id); + let activate: HashSet = self.quiesced_timelines.range(activate_range).map(|(ttid, _quiesced_lsn)| ttid.timeline_id).collect(); + for timeline_id in activate { + let quiesced_lsn = self.quiesced_timelines.remove(&tenant_timeline_id).expect("we just saw it in the .range()"); + for attachment in self.attachments.iter() + let timeline_attachment_id = TimelineAttachmentId { + tenant_shard_attachment_id, + timeline_id, + }; + match self.remote_consistent_lsns.entry(timeline_attachment_id) { + Occupied(entry) => { + panic!("inconsistency; did an activation from remote_consistent_lsn adv handling not clean up quiesced_timelines?"); + }, + Vacant(entry) => { + entry.insert(quiesced_lsn); + }, + } + } } (Detach, Occupied(e)) => { e.remove(); @@ -99,12 +98,12 @@ impl World { pub fn handle_remote_consistent_lsn_advertisement( &mut self, adv: RemoteConsistentLsnAdv, - ) -> Result<(), storage::Waiter> { + ) { let RemoteConsistentLsnAdv { attachment, remote_consistent_lsn, } = adv; - self.unoffload_timeline(attachment.tenant_timeline_id())?; + match self.remote_consistent_lsns.entry(attachment) { hash_map::Entry::Occupied(mut occupied_entry) => { let current = occupied_entry.get_mut(); @@ -112,16 +111,35 @@ impl World { warn!( "ignoring advertisement because remote_consistent_lsn is moving backwards" ); - return Ok(()); + return; } *current = remote_consistent_lsn; } - hash_map::Entry::Vacant(vacant_entry) => { - info!("first time hearing from timeline attachment"); - vacant_entry.insert(remote_consistent_lsn); + hash_map::Entry::Vacant(vacant_attachment_entry) => { + match (self.quiesced_timelines.entry(attachment.tenant_timeline_id()), remote_consistent_lsn) { + (btree_map::Entry::Occupied(entry), remote_consistent_lsn) if *entry.get() == remote_consistent_lsn => { + info!("ignoring no-op update for quiesced timeline"); + } + (btree_map::Entry::Occupied(entry), remote_consistent_lsn) => { + info!("update for quiesced timeline -> activating"); + let quiesced_lsn = entry.remove(); + let reconstruct_remote_consistent_lsn_entries = self.attachments.keys().map(|tenant_shard_attachment_id| { + (TimelineAttachmentId { + tenant_shard_attachment_id, + timeline_id: attachment.timeline_id, + }, quiesced_lsn)}); + self.remote_consistent_lsns.reserve(reconstruct_remote_consistent_lsn_entries.len()); + for (key, value) in reconstruct_remote_consistent_lsn_entries { + self.remote_consistent_lsns.insert(key, value); + } + } + (hash_map::Entry::Vacant(entry), remote_consistent_lsn) => { + info!("first time hearing about timeline attachment"); + entry.insert(remote_consistent_lsn); + } + } } } - Ok(()) } pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) { match self.commit_lsns.entry(ttid) { @@ -129,9 +147,21 @@ impl World { assert!(*occupied_entry.get() <= commit_lsn); *occupied_entry.get_mut() = commit_lsn; } - hash_map::Entry::Vacant(vacant_entry) => { - info!("first time learning about sk timeline"); - vacant_entry.insert(commit_lsn); + hash_map::Entry::Vacant(vacant_commit_lsns_entry) => { + match (self.quiesced_timelines.entry(ttid), commit_lsn) { + (btree_map::Entry::Occupied(entry), commit_lsn) if *entry.get() == commit_lsn => { + info!("ignoring no-op update for quiesced timeline"); + }, + (btree_map::Entry::Occupied(entry), commit_lsn) => { + info!("update for quiesced timeline -> activating"); + let quiesced_lsn = entry.remove(); + vacant_commit_lsns_entry.insert(quiesced_lsn); + }, + (btree_map::Entry::Vacant(entry), commit_lsn) => { + info!("first time hearing about commit_lsn for this timeline"); + entry.insert(commit_lsn); + } + } } } } @@ -166,86 +196,6 @@ impl World { } commit_lsn_advertisements_by_node } - - fn unoffload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), WaitForStorage> { - let Some(state) = self.offloaded_timelines.get_mut(&ttid) else { - return Ok(()); - }; - match state { - OffloadState::Unoffloading(waiter) | - OffloadState::Offloading(waiter) => waiter.clone(), - OffloadState::Offloaded => { - *state = &mut OffloadState::Unoffloading(); - }, - } - Err(Effect::UnoffloadTimeline { - tenant_timeline_id: ttid, - }) - } - - pub fn complete_unoffload_timeline(&mut self, completion: EffectCompletionUnoffloadTimeline) { - let EffectCompletionUnoffloadTimeline { - tenant_timeline_id, - loaded: storage, - } = completion; - if !self.offloaded_timelines.remove(&tenant_timeline_id) { - return; - } - for (tenant_timeline_id, commit_lsn) in commit_lsns { - match self.commit_lsns.entry(tenant_timeline_id) { - hash_map::Entry::Occupied(occupied_entry) => { - panic!( - "inconsistent: entry is supposed to be paged_out:\ninmem={:?}\nondisk={:?}", - occupied_entry.get(), - (tenant_timeline_id, commit_lsn) - ) - } - hash_map::Entry::Vacant(vacant_entry) => { - vacant_entry.insert(commit_lsn); - } - } - } - for (timeline_attachment_id, remote_consistent_lsn) in remote_consistent_lsns { - match self.remote_consistent_lsns.entry(timeline_attachment_id) { - hash_map::Entry::Occupied(occupied_entry) => { - panic!( - "inconsistent: entry is supposed to be paged_out:\ninmem={:?}\nondisk={:?}", - occupied_entry.get(), - (timeline_attachment_id, remote_consistent_lsn) - ) - } - hash_map::Entry::Vacant(vacant_entry) => { - vacant_entry.insert(remote_consistent_lsn); - } - } - } - } - - pub fn offload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), Effect> { - match self.offloaded_timelines.entry(ttid) { - hash_map::Entry::Occupied(occupied_entry) => { - match occupied_entry.get() { - OffloadState::Unoffloading | - OffloadState::Offloading => - OffloadState::Offloaded => todo!(), - } - }, - hash_map::Entry::Vacant(vacant_entry) => todo!(), - } - Err(Effect::OffloadTimeline { - tenant_timeline_id: ttid, - }) - } - - pub fn complete_offload_timeline(&mut self, completion: EffectCompletionOffloadTimeline) { - let EffectCompletionOffloadTimeline { tenant_timeline_id } = completion; - match self.offloaded_timelines.entry(tenant_timeline_id) { - hash_map::Entry::Occupied(occupied_entry) => { - occupied_entry. - }, - hash_map::Entry::Vacant(vacant_entry) => todo!(), - } - } } impl TimelineAttachmentId { diff --git a/libs/sk_ps_discovery/src/storage.rs b/libs/sk_ps_discovery/src/storage.rs deleted file mode 100644 index 12c885dcaa..0000000000 --- a/libs/sk_ps_discovery/src/storage.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::collections::HashMap; - -use utils::id::TenantTimelineId; - -use crate::TimelineAttachmentId; - -pub trait Storage { - fn get_timeline(&self, ttid: TenantTimelineId) -> Timeline; - fn store_timeline(&self, ttid: TenantTimelineId, timeline: Timeline); -} - -#[derive(Clone)] -pub struct Waiter { - -} - -pub struct Timeline { - pub remote_consistent_lsns: HashMap, -} diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 68cb1f0209..22a5c2c4f2 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -1,5 +1,6 @@ use std::fmt; use std::num::ParseIntError; +use std::ops::RangeInclusive; use std::str::FromStr; use anyhow::Context; @@ -320,6 +321,19 @@ impl TenantTimelineId { pub fn empty() -> Self { Self::new(TenantId::from([0u8; 16]), TimelineId::from([0u8; 16])) } + + pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive { + RangeInclusive::new( + Self { + tenant_id, + timeline_id: TimelineId::from_array([u8::MIN; 16]), + }, + Self { + tenant_id, + timeline_id: TimelineId::from_array([u8::MAX; 16]), + }, + ) + } } impl fmt::Display for TenantTimelineId {