From e40b1c79fa5d66c86651f345bb24e54231127e6b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 6 Jun 2025 12:31:56 -0700 Subject: [PATCH] carginality estimation for collected hashmap; maintain `nodes` and `nodes_timelines`; allocations not showing in flamegraph anymore, but replaced with btreemap, but replaced with btreemap, but replaced with btreemap, but replaced with btreemap::get --- libs/sk_ps_discovery/benches/bench.rs | 9 ++-- libs/sk_ps_discovery/src/lib.rs | 65 ++++++++++++++++++++++++--- libs/sk_ps_discovery/src/tests.rs | 44 ++++++++++++++++++ 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/libs/sk_ps_discovery/benches/bench.rs b/libs/sk_ps_discovery/benches/bench.rs index 35e5a3ec63..ecddbd451e 100644 --- a/libs/sk_ps_discovery/benches/bench.rs +++ b/libs/sk_ps_discovery/benches/bench.rs @@ -3,7 +3,6 @@ use std::time::Instant; use criterion::{Criterion, criterion_group, criterion_main}; -use hex::FromHex; use pprof::criterion::{Output, PProfProfiler}; use sk_ps_discovery::{ AttachmentUpdate, RemoteConsistentLsnAdv, TenantShardAttachmentId, TimelineAttachmentId, @@ -51,8 +50,12 @@ fn bench_simple(c: &mut Criterion) { generation: Generation::Valid(generation), }; let timeline_attachment = TimelineAttachmentId { - tenant_shard_attachment_id, - timeline_id, + tenant_timeline_id: TenantTimelineId { + tenant_id, + timeline_id, + }, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(generation), }; world.update_attachment(AttachmentUpdate { tenant_shard_attachment_id, diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 2ac87f9f42..12bc6c738d 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -2,7 +2,7 @@ mod tests; use std::{ - collections::{BTreeMap, HashMap, HashSet, btree_map, hash_map}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map}, ops::RangeInclusive, }; @@ -19,6 +19,7 @@ use utils::{ pub struct World { attachments: BTreeMap, attachment_count: HashMap, + nodes_timelines: HashMap>, // u16 is a refcount from each timeline attachment id // continously maintained aggregate for efficient decisionmaking on quiescing; // quiesced timelines are always caught up // can quiesce one == attachment_count (TODO: this requires enforcing foreign key relationship between attachments and remote_consistent_lsn) @@ -126,6 +127,34 @@ impl World { #[rustfmt::skip] assert_eq!(0, quiesced_timelines.intersection(&remote_consistent_lsn_timelines).count()); } + + // nodes_timelines maintenance + { + let mut expect: HashMap> = HashMap::new(); + let all_ttids: BTreeSet = self + .quiesced_timelines + .keys() + .cloned() + .chain( + self.remote_consistent_lsns + .keys() + .cloned() + .map(|tlaid| tlaid.tenant_timeline_id), + ) + .collect(); + for ttid in all_ttids { + for (_, node_id) in self + .attachments + .range(TenantShardAttachmentId::tenant_range(ttid.tenant_id)) + .map(|(k, v)| (*k, *v)) + { + let expect = expect.entry(node_id).or_default(); + let refcount = expect.entry(ttid).or_default(); + *refcount += 1; + } + } + assert_eq!(expect, self.nodes_timelines); + } } pub fn update_attachment(&mut self, upd: AttachmentUpdate) { self.check_invariants(); @@ -150,6 +179,17 @@ impl World { .entry(tenant_shard_attachment_id.tenant_id) .or_default(); *attachment_count += attachment_count.checked_add(1).unwrap(); + // Keep nodes_timelines up to date + let nodes_timelines = self.nodes_timelines.entry(ps_id).or_default(); + for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range( + tenant_shard_attachment_id.tenant_id, + )) { + let refcount = nodes_timelines.entry(*ttid).or_default(); + *refcount = refcount.checked_add(1).unwrap(); + } + if nodes_timelines.is_empty() { + self.nodes_timelines.remove(&ps_id); + } // New shards may start at an older LSN than where we quiesced => activate all quiesced timelines. let activate_range = TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id); @@ -163,13 +203,24 @@ impl World { } } (Detach, Occupied(e)) => { - e.remove(); + let ps_id = e.remove(); // Keep attachment count up to date let attachment_count = self .attachment_count .get_mut(&tenant_shard_attachment_id.tenant_id) .expect("attachment action initializes the hasmap entry"); *attachment_count = attachment_count.checked_sub(1).unwrap(); + // Keep nodes_timelines up to date + let nodes_timelines = self + .nodes_timelines + .get_mut(&ps_id) + .expect("attachment action initializes hashmap entry"); + for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range( + tenant_shard_attachment_id.tenant_id, + )) { + let refcount = nodes_timelines.entry(*ttid).or_default(); + *refcount = refcount.checked_sub(1).unwrap(); + } } (Detach, Vacant(_)) => { info!("detachment is already known"); @@ -274,7 +325,7 @@ impl World { let replaced = self.caught_up_count.insert(ttid, 0); // only commit_lsn advancement makes timelines known to world assert_eq!(None, replaced); - for (attachment, _) in self + for (attachment, node_id) in self .attachments .range(TenantShardAttachmentId::tenant_range(ttid.tenant_id)) { @@ -284,6 +335,10 @@ impl World { ); // only commit_lsn advancement makes timelines known to World assert_eq!(None, replaced); + + let nodes_timelines = self.nodes_timelines.entry(*node_id).or_default(); + let refcount = nodes_timelines.entry(ttid).or_default(); + *refcount = refcount.checked_add(1).unwrap(); } } } @@ -294,7 +349,7 @@ impl World { pub fn get_commit_lsn_advertisements(&self) -> HashMap> { let mut commit_lsn_advertisements_by_node: HashMap> = - Default::default(); + HashMap::with_capacity(self.nodes_timelines.len()); let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v)); let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v)); @@ -329,7 +384,7 @@ impl World { // DISTINCT node_id, array_agg(DISTINCT tenant_shard_id ) let for_node = commit_lsn_advertisements_by_node .entry(node_id) - .or_default(); + .or_insert_with(|| HashMap::with_capacity(self.nodes_timelines[&node_id].len())); match for_node.entry(tenant_timeline_id) { hash_map::Entry::Vacant(vacant_entry) => { vacant_entry.insert(commit_lsn); diff --git a/libs/sk_ps_discovery/src/tests.rs b/libs/sk_ps_discovery/src/tests.rs index 6413526f64..e1c3e42a7d 100644 --- a/libs/sk_ps_discovery/src/tests.rs +++ b/libs/sk_ps_discovery/src/tests.rs @@ -176,3 +176,47 @@ fn quiescing_timeline_catchup() { assert!(world.quiesced_timelines.contains_key(&ttid)); } + +#[test] +fn nodes_timelines() { + let mut world = World::default(); + + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::from_array([0x1; 16]); + let ttid = TenantTimelineId { + tenant_id, + timeline_id, + }; + + let tenant_shard_attachment_id = TenantShardAttachmentId { + tenant_id, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(2), + }; + + let ps_id = NodeId(0x100); + + world.update_attachment(AttachmentUpdate { + tenant_shard_attachment_id, + action: AttachmentUpdateAction::Attach { ps_id }, + }); + + assert!(world.nodes_timelines.get(&ps_id).is_none()); + + world.handle_commit_lsn_advancement(ttid, Lsn(0x23)); + + assert_eq!(world.nodes_timelines[&ps_id].len(), 1); + + let timeline2 = TimelineId::from_array([0x2; 16]); + world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv { + attachment: TimelineAttachmentId { + tenant_timeline_id: TenantTimelineId { + tenant_id, + timeline_id: timeline2, + }, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(2), + }, + remote_consistent_lsn: Lsn(0x42), + }); +}