diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 069d411af9..0fbf21ab24 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -18,14 +18,23 @@ use utils::{ #[derive(Debug, Default)] pub struct World { attachments: BTreeMap, + attachment_count: HashMap, + // continously maintained aggregate for efficient decisionmaking on quiescing; + // quiesced timelines are always caught up + // can quiesce one == attachment_count (TODO: this requires enforcing foreign key relationship between attachments and remote_consistent_lsn) + caught_up_count: HashMap, + // BEGIN quiescing/active split quiesced_timelines: BTreeMap, // ^ // either a timeline is in quiesced_timelines - // or it is in commit_lsns + remote_consistent_lsns + // or it is below // v commit_lsns: BTreeMap, remote_consistent_lsns: BTreeMap, + // END quiescing/active split + + // other fields } #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] @@ -97,6 +106,12 @@ impl World { } (Attach { ps_id }, Vacant(e)) => { e.insert(ps_id); + // Keep attachmount_count up to date + let attachment_count = self + .attachment_count + .entry(tenant_shard_attachment_id.tenant_id) + .or_default(); + *attachment_count += attachment_count.checked_add(1).unwrap(); // New shards may start at an older LSN than where we quiesced => activate all quiesced timelines. let activate_range = TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id); @@ -111,6 +126,12 @@ impl World { } (Detach, Occupied(e)) => { e.remove(); + // Keep attachment count up to date + let attachment_count = self + .attachment_count + .get_mut(&tenant_shard_attachment_id.tenant_id) + .expect("attachment action initializes the hasmap entry"); + *attachment_count = attachment_count.checked_sub(1).unwrap(); } (Detach, Vacant(_)) => { info!("detachment is already known"); @@ -132,6 +153,16 @@ impl World { match (*current).cmp(&remote_consistent_lsn) { Less => { *current = remote_consistent_lsn; + let caught_up_count = self + .caught_up_count + .get_mut(&attachment.tenant_timeline_id) + .unwrap(); + *caught_up_count = caught_up_count.checked_add(1).unwrap(); + if *caught_up_count + == self.attachment_count[&attachment.tenant_timeline_id.tenant_id] + { + self.quiesce_timeline(attachment.tenant_timeline_id); + } } Equal => { info!("ignoring no-op update, likely duplicate delivery"); @@ -172,6 +203,10 @@ impl World { match (*current).cmp(&update) { Less => { *current = update; + // We never allow remote_consistent_lsn to be ahead of commit_lsn. + // Therefore, it is safe to say nothing is caught up anymore. + let caught_up_count = self.caught_up_count.get_mut(&ttid).unwrap(); + *caught_up_count = 0; } Equal => { // This code runs in safekeeper impl, no reason why there would be duplicate delivery. @@ -281,6 +316,13 @@ impl World { assert_eq!(None, replaced); } } + + fn quiesce_timeline(&mut self, tenant_timeline_id: TenantTimelineId) { + if self.quiesced_timelines.contains_key(&tenant_timeline_id) { + panic!("only call this function on active timelines"); + } + todo!(); + } } impl TimelineAttachmentId {