From a4b9335b73edd7abbbdab1188618a64e75729d4e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 4 Jun 2025 20:38:55 +0200 Subject: [PATCH] dabble around with effect-style system --- libs/sk_ps_discovery/src/completion.rs | 11 +++ libs/sk_ps_discovery/src/lib.rs | 93 ++++++++++++++++++++++---- libs/sk_ps_discovery/src/storage.rs | 1 - 3 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 libs/sk_ps_discovery/src/completion.rs diff --git a/libs/sk_ps_discovery/src/completion.rs b/libs/sk_ps_discovery/src/completion.rs new file mode 100644 index 0000000000..45d95214b5 --- /dev/null +++ b/libs/sk_ps_discovery/src/completion.rs @@ -0,0 +1,11 @@ +pub struct Waiter { + inner: Arc, +} + +struct Inner { + next: Box, +} + +impl Waiter { + pub fn +} diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index c0debfae09..014e4299f7 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -2,6 +2,7 @@ mod tests; mod storage; +mod completion; use std::collections::{HashMap, HashSet, hash_map}; @@ -18,7 +19,7 @@ pub struct World { attachments: HashMap, commit_lsns: HashMap, remote_consistent_lsns: HashMap, - paged_out: HashSet, + offloaded_timelines: HashMap, storage: Box, } @@ -50,6 +51,30 @@ pub struct RemoteConsistentLsnAdv { pub remote_consistent_lsn: Lsn, } +pub enum Effect { + UnoffloadTimeline { + tenant_timeline_id: TenantTimelineId, + }, + OffloadTimeline { + tenant_timeline_id: TenantTimelineId, + }, +} + +pub struct EffectCompletionUnoffloadTimeline { + pub tenant_timeline_id: TenantTimelineId, + pub loaded: storage::Timeline, +} + +pub struct EffectCompletionOffloadTimeline { + pub tenant_timeline_id: TenantTimelineId, +} + +enum OffloadState { + Unoffloading(completion::Waiter), + Offloading(completion::Waiter), + Offloaded, +} + impl World { pub fn update_attachment(&mut self, upd: AttachmentUpdate) { use AttachmentUpdateAction::*; @@ -76,12 +101,15 @@ impl World { } } } - pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) { + pub fn handle_remote_consistent_lsn_advertisement( + &mut self, + adv: RemoteConsistentLsnAdv, + ) -> Result<(), Effect> { let RemoteConsistentLsnAdv { attachment, remote_consistent_lsn, } = adv; - self.page_in_ttid(attachment.tenant_timeline_id()); + self.unoffload_timeline(attachment.tenant_timeline_id())?; match self.remote_consistent_lsns.entry(attachment) { hash_map::Entry::Occupied(mut occupied_entry) => { let current = occupied_entry.get_mut(); @@ -89,7 +117,7 @@ impl World { warn!( "ignoring advertisement because remote_consistent_lsn is moving backwards" ); - return; + return Ok(()); } *current = remote_consistent_lsn; } @@ -98,9 +126,9 @@ impl World { vacant_entry.insert(remote_consistent_lsn); } } + Ok(()) } pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) { - self.page_in_ttid(ttid); match self.commit_lsns.entry(ttid) { hash_map::Entry::Occupied(mut occupied_entry) => { assert!(*occupied_entry.get() <= commit_lsn); @@ -144,16 +172,27 @@ impl World { commit_lsn_advertisements_by_node } - fn page_in_ttid(&mut self, ttid: TenantTimelineId) { - if !self.paged_out.remove(&ttid) { - return; + fn unoffload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), Effect> { + let Some(state) = self.offloaded_timelines.get(&ttid) else { + return Ok(()); }; - // below is infallible; if we ever make it fallible, need to - // rollback the removal from paged_out in case we bail with error - let storage::Timeline { - commit_lsns, - remote_consistent_lsns, - } = self.storage.get_timeline(ttid); + match state { + OffloadState::InProgress(receiver) => return Err(Effect::WaitForOffload), + OffloadState::Offloaded => todo!(), + } + Err(Effect::UnoffloadTimeline { + tenant_timeline_id: ttid, + }) + } + + pub fn complete_unoffload_timeline(&mut self, completion: EffectCompletionUnoffloadTimeline) { + let EffectCompletionUnoffloadTimeline { + tenant_timeline_id, + loaded: storage, + } = completion; + if !self.offloaded_timelines.remove(&tenant_timeline_id) { + return; + } for (tenant_timeline_id, commit_lsn) in commit_lsns { match self.commit_lsns.entry(tenant_timeline_id) { hash_map::Entry::Occupied(occupied_entry) => { @@ -183,6 +222,32 @@ impl World { } } } + + pub fn offload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), Effect> { + match self.offloaded_timelines.entry(ttid) { + hash_map::Entry::Occupied(occupied_entry) => { + match occupied_entry.get() { + OffloadState::Unoffloading | + OffloadState::Offloading => + OffloadState::Offloaded => todo!(), + } + }, + hash_map::Entry::Vacant(vacant_entry) => todo!(), + } + Err(Effect::OffloadTimeline { + tenant_timeline_id: ttid, + }) + } + + pub fn complete_offload_timeline(&mut self, completion: EffectCompletionOffloadTimeline) { + let EffectCompletionOffloadTimeline { tenant_timeline_id } = completion; + match self.offloaded_timelines.entry(tenant_timeline_id) { + hash_map::Entry::Occupied(occupied_entry) => { + occupied_entry. + }, + hash_map::Entry::Vacant(vacant_entry) => todo!(), + } + } } impl TimelineAttachmentId { diff --git a/libs/sk_ps_discovery/src/storage.rs b/libs/sk_ps_discovery/src/storage.rs index f19ef9f471..8c017c14ef 100644 --- a/libs/sk_ps_discovery/src/storage.rs +++ b/libs/sk_ps_discovery/src/storage.rs @@ -10,6 +10,5 @@ pub trait Storage { } pub struct Timeline { - pub commit_lsns: HashMap, pub remote_consistent_lsns: HashMap, }