mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 22:42:57 +00:00
dabble around with effect-style system
This commit is contained in:
11
libs/sk_ps_discovery/src/completion.rs
Normal file
11
libs/sk_ps_discovery/src/completion.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
pub struct Waiter {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
next: Box<Inner>,
|
||||
}
|
||||
|
||||
impl Waiter {
|
||||
pub fn
|
||||
}
|
||||
@@ -2,6 +2,7 @@
|
||||
mod tests;
|
||||
|
||||
mod storage;
|
||||
mod completion;
|
||||
|
||||
use std::collections::{HashMap, HashSet, hash_map};
|
||||
|
||||
@@ -18,7 +19,7 @@ pub struct World {
|
||||
attachments: HashMap<TenantShardAttachmentId, NodeId>,
|
||||
commit_lsns: HashMap<TenantTimelineId, Lsn>,
|
||||
remote_consistent_lsns: HashMap<TimelineAttachmentId, Lsn>,
|
||||
paged_out: HashSet<TenantTimelineId>,
|
||||
offloaded_timelines: HashMap<TenantTimelineId, OffloadState>,
|
||||
storage: Box<dyn storage::Storage>,
|
||||
}
|
||||
|
||||
@@ -50,6 +51,30 @@ pub struct RemoteConsistentLsnAdv {
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
}
|
||||
|
||||
pub enum Effect {
|
||||
UnoffloadTimeline {
|
||||
tenant_timeline_id: TenantTimelineId,
|
||||
},
|
||||
OffloadTimeline {
|
||||
tenant_timeline_id: TenantTimelineId,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct EffectCompletionUnoffloadTimeline {
|
||||
pub tenant_timeline_id: TenantTimelineId,
|
||||
pub loaded: storage::Timeline,
|
||||
}
|
||||
|
||||
pub struct EffectCompletionOffloadTimeline {
|
||||
pub tenant_timeline_id: TenantTimelineId,
|
||||
}
|
||||
|
||||
enum OffloadState {
|
||||
Unoffloading(completion::Waiter),
|
||||
Offloading(completion::Waiter),
|
||||
Offloaded,
|
||||
}
|
||||
|
||||
impl World {
|
||||
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
|
||||
use AttachmentUpdateAction::*;
|
||||
@@ -76,12 +101,15 @@ impl World {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn handle_remote_consistent_lsn_advertisement(&mut self, adv: RemoteConsistentLsnAdv) {
|
||||
pub fn handle_remote_consistent_lsn_advertisement(
|
||||
&mut self,
|
||||
adv: RemoteConsistentLsnAdv,
|
||||
) -> Result<(), Effect> {
|
||||
let RemoteConsistentLsnAdv {
|
||||
attachment,
|
||||
remote_consistent_lsn,
|
||||
} = adv;
|
||||
self.page_in_ttid(attachment.tenant_timeline_id());
|
||||
self.unoffload_timeline(attachment.tenant_timeline_id())?;
|
||||
match self.remote_consistent_lsns.entry(attachment) {
|
||||
hash_map::Entry::Occupied(mut occupied_entry) => {
|
||||
let current = occupied_entry.get_mut();
|
||||
@@ -89,7 +117,7 @@ impl World {
|
||||
warn!(
|
||||
"ignoring advertisement because remote_consistent_lsn is moving backwards"
|
||||
);
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
*current = remote_consistent_lsn;
|
||||
}
|
||||
@@ -98,9 +126,9 @@ impl World {
|
||||
vacant_entry.insert(remote_consistent_lsn);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) {
|
||||
self.page_in_ttid(ttid);
|
||||
match self.commit_lsns.entry(ttid) {
|
||||
hash_map::Entry::Occupied(mut occupied_entry) => {
|
||||
assert!(*occupied_entry.get() <= commit_lsn);
|
||||
@@ -144,16 +172,27 @@ impl World {
|
||||
commit_lsn_advertisements_by_node
|
||||
}
|
||||
|
||||
fn page_in_ttid(&mut self, ttid: TenantTimelineId) {
|
||||
if !self.paged_out.remove(&ttid) {
|
||||
return;
|
||||
fn unoffload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), Effect> {
|
||||
let Some(state) = self.offloaded_timelines.get(&ttid) else {
|
||||
return Ok(());
|
||||
};
|
||||
// below is infallible; if we ever make it fallible, need to
|
||||
// rollback the removal from paged_out in case we bail with error
|
||||
let storage::Timeline {
|
||||
commit_lsns,
|
||||
remote_consistent_lsns,
|
||||
} = self.storage.get_timeline(ttid);
|
||||
match state {
|
||||
OffloadState::InProgress(receiver) => return Err(Effect::WaitForOffload),
|
||||
OffloadState::Offloaded => todo!(),
|
||||
}
|
||||
Err(Effect::UnoffloadTimeline {
|
||||
tenant_timeline_id: ttid,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn complete_unoffload_timeline(&mut self, completion: EffectCompletionUnoffloadTimeline) {
|
||||
let EffectCompletionUnoffloadTimeline {
|
||||
tenant_timeline_id,
|
||||
loaded: storage,
|
||||
} = completion;
|
||||
if !self.offloaded_timelines.remove(&tenant_timeline_id) {
|
||||
return;
|
||||
}
|
||||
for (tenant_timeline_id, commit_lsn) in commit_lsns {
|
||||
match self.commit_lsns.entry(tenant_timeline_id) {
|
||||
hash_map::Entry::Occupied(occupied_entry) => {
|
||||
@@ -183,6 +222,32 @@ impl World {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn offload_timeline(&mut self, ttid: TenantTimelineId) -> Result<(), Effect> {
|
||||
match self.offloaded_timelines.entry(ttid) {
|
||||
hash_map::Entry::Occupied(occupied_entry) => {
|
||||
match occupied_entry.get() {
|
||||
OffloadState::Unoffloading |
|
||||
OffloadState::Offloading =>
|
||||
OffloadState::Offloaded => todo!(),
|
||||
}
|
||||
},
|
||||
hash_map::Entry::Vacant(vacant_entry) => todo!(),
|
||||
}
|
||||
Err(Effect::OffloadTimeline {
|
||||
tenant_timeline_id: ttid,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn complete_offload_timeline(&mut self, completion: EffectCompletionOffloadTimeline) {
|
||||
let EffectCompletionOffloadTimeline { tenant_timeline_id } = completion;
|
||||
match self.offloaded_timelines.entry(tenant_timeline_id) {
|
||||
hash_map::Entry::Occupied(occupied_entry) => {
|
||||
occupied_entry.
|
||||
},
|
||||
hash_map::Entry::Vacant(vacant_entry) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelineAttachmentId {
|
||||
|
||||
@@ -10,6 +10,5 @@ pub trait Storage {
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
pub commit_lsns: HashMap<TenantTimelineId, Lsn>,
|
||||
pub remote_consistent_lsns: HashMap<TimelineAttachmentId, Lsn>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user