mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
WIP auto-quiesce
This commit is contained in:
@@ -18,14 +18,23 @@ use utils::{
|
||||
#[derive(Debug, Default)]
|
||||
pub struct World {
|
||||
attachments: BTreeMap<TenantShardAttachmentId, NodeId>,
|
||||
attachment_count: HashMap<TenantId, u16>,
|
||||
// continously maintained aggregate for efficient decisionmaking on quiescing;
|
||||
// quiesced timelines are always caught up
|
||||
// can quiesce one == attachment_count (TODO: this requires enforcing foreign key relationship between attachments and remote_consistent_lsn)
|
||||
caught_up_count: HashMap<TenantTimelineId, u16>,
|
||||
|
||||
// BEGIN quiescing/active split
|
||||
quiesced_timelines: BTreeMap<TenantTimelineId, Lsn>,
|
||||
// ^
|
||||
// either a timeline is in quiesced_timelines
|
||||
// or it is in commit_lsns + remote_consistent_lsns
|
||||
// or it is below
|
||||
// v
|
||||
commit_lsns: BTreeMap<TenantTimelineId, Lsn>,
|
||||
remote_consistent_lsns: BTreeMap<TimelineAttachmentId, Lsn>,
|
||||
// END quiescing/active split
|
||||
|
||||
// other fields
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
|
||||
@@ -97,6 +106,12 @@ impl World {
|
||||
}
|
||||
(Attach { ps_id }, Vacant(e)) => {
|
||||
e.insert(ps_id);
|
||||
// Keep attachmount_count up to date
|
||||
let attachment_count = self
|
||||
.attachment_count
|
||||
.entry(tenant_shard_attachment_id.tenant_id)
|
||||
.or_default();
|
||||
*attachment_count += attachment_count.checked_add(1).unwrap();
|
||||
// New shards may start at an older LSN than where we quiesced => activate all quiesced timelines.
|
||||
let activate_range =
|
||||
TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
|
||||
@@ -111,6 +126,12 @@ impl World {
|
||||
}
|
||||
(Detach, Occupied(e)) => {
|
||||
e.remove();
|
||||
// Keep attachment count up to date
|
||||
let attachment_count = self
|
||||
.attachment_count
|
||||
.get_mut(&tenant_shard_attachment_id.tenant_id)
|
||||
.expect("attachment action initializes the hasmap entry");
|
||||
*attachment_count = attachment_count.checked_sub(1).unwrap();
|
||||
}
|
||||
(Detach, Vacant(_)) => {
|
||||
info!("detachment is already known");
|
||||
@@ -132,6 +153,16 @@ impl World {
|
||||
match (*current).cmp(&remote_consistent_lsn) {
|
||||
Less => {
|
||||
*current = remote_consistent_lsn;
|
||||
let caught_up_count = self
|
||||
.caught_up_count
|
||||
.get_mut(&attachment.tenant_timeline_id)
|
||||
.unwrap();
|
||||
*caught_up_count = caught_up_count.checked_add(1).unwrap();
|
||||
if *caught_up_count
|
||||
== self.attachment_count[&attachment.tenant_timeline_id.tenant_id]
|
||||
{
|
||||
self.quiesce_timeline(attachment.tenant_timeline_id);
|
||||
}
|
||||
}
|
||||
Equal => {
|
||||
info!("ignoring no-op update, likely duplicate delivery");
|
||||
@@ -172,6 +203,10 @@ impl World {
|
||||
match (*current).cmp(&update) {
|
||||
Less => {
|
||||
*current = update;
|
||||
// We never allow remote_consistent_lsn to be ahead of commit_lsn.
|
||||
// Therefore, it is safe to say nothing is caught up anymore.
|
||||
let caught_up_count = self.caught_up_count.get_mut(&ttid).unwrap();
|
||||
*caught_up_count = 0;
|
||||
}
|
||||
Equal => {
|
||||
// This code runs in safekeeper impl, no reason why there would be duplicate delivery.
|
||||
@@ -281,6 +316,13 @@ impl World {
|
||||
assert_eq!(None, replaced);
|
||||
}
|
||||
}
|
||||
|
||||
fn quiesce_timeline(&mut self, tenant_timeline_id: TenantTimelineId) {
|
||||
if self.quiesced_timelines.contains_key(&tenant_timeline_id) {
|
||||
panic!("only call this function on active timelines");
|
||||
}
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelineAttachmentId {
|
||||
|
||||
Reference in New Issue
Block a user