carginality estimation for collected hashmap; maintain nodes and nodes_timelines; allocations not showing in flamegraph anymore, but replaced with btreemap, but replaced with btreemap, but replaced with btreemap, but replaced with btreemap::get

This commit is contained in:
Christian Schwarz
2025-06-06 12:31:56 -07:00
parent 2f416267dc
commit e40b1c79fa
3 changed files with 110 additions and 8 deletions

View File

@@ -3,7 +3,6 @@
use std::time::Instant;
use criterion::{Criterion, criterion_group, criterion_main};
use hex::FromHex;
use pprof::criterion::{Output, PProfProfiler};
use sk_ps_discovery::{
AttachmentUpdate, RemoteConsistentLsnAdv, TenantShardAttachmentId, TimelineAttachmentId,
@@ -51,8 +50,12 @@ fn bench_simple(c: &mut Criterion) {
generation: Generation::Valid(generation),
};
let timeline_attachment = TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id,
tenant_timeline_id: TenantTimelineId {
tenant_id,
timeline_id,
},
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(generation),
};
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,

View File

@@ -2,7 +2,7 @@
mod tests;
use std::{
collections::{BTreeMap, HashMap, HashSet, btree_map, hash_map},
collections::{BTreeMap, BTreeSet, HashMap, HashSet, btree_map, hash_map},
ops::RangeInclusive,
};
@@ -19,6 +19,7 @@ use utils::{
pub struct World {
attachments: BTreeMap<TenantShardAttachmentId, NodeId>,
attachment_count: HashMap<TenantId, u16>,
nodes_timelines: HashMap<NodeId, HashMap<TenantTimelineId, u16>>, // u16 is a refcount from each timeline attachment id
// continously maintained aggregate for efficient decisionmaking on quiescing;
// quiesced timelines are always caught up
// can quiesce one == attachment_count (TODO: this requires enforcing foreign key relationship between attachments and remote_consistent_lsn)
@@ -126,6 +127,34 @@ impl World {
#[rustfmt::skip]
assert_eq!(0, quiesced_timelines.intersection(&remote_consistent_lsn_timelines).count());
}
// nodes_timelines maintenance
{
let mut expect: HashMap<NodeId, HashMap<TenantTimelineId, u16>> = HashMap::new();
let all_ttids: BTreeSet<TenantTimelineId> = self
.quiesced_timelines
.keys()
.cloned()
.chain(
self.remote_consistent_lsns
.keys()
.cloned()
.map(|tlaid| tlaid.tenant_timeline_id),
)
.collect();
for ttid in all_ttids {
for (_, node_id) in self
.attachments
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
.map(|(k, v)| (*k, *v))
{
let expect = expect.entry(node_id).or_default();
let refcount = expect.entry(ttid).or_default();
*refcount += 1;
}
}
assert_eq!(expect, self.nodes_timelines);
}
}
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
self.check_invariants();
@@ -150,6 +179,17 @@ impl World {
.entry(tenant_shard_attachment_id.tenant_id)
.or_default();
*attachment_count += attachment_count.checked_add(1).unwrap();
// Keep nodes_timelines up to date
let nodes_timelines = self.nodes_timelines.entry(ps_id).or_default();
for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range(
tenant_shard_attachment_id.tenant_id,
)) {
let refcount = nodes_timelines.entry(*ttid).or_default();
*refcount = refcount.checked_add(1).unwrap();
}
if nodes_timelines.is_empty() {
self.nodes_timelines.remove(&ps_id);
}
// New shards may start at an older LSN than where we quiesced => activate all quiesced timelines.
let activate_range =
TenantTimelineId::tenant_range(tenant_shard_attachment_id.tenant_id);
@@ -163,13 +203,24 @@ impl World {
}
}
(Detach, Occupied(e)) => {
e.remove();
let ps_id = e.remove();
// Keep attachment count up to date
let attachment_count = self
.attachment_count
.get_mut(&tenant_shard_attachment_id.tenant_id)
.expect("attachment action initializes the hasmap entry");
*attachment_count = attachment_count.checked_sub(1).unwrap();
// Keep nodes_timelines up to date
let nodes_timelines = self
.nodes_timelines
.get_mut(&ps_id)
.expect("attachment action initializes hashmap entry");
for (ttid, _) in self.commit_lsns.range(TenantTimelineId::tenant_range(
tenant_shard_attachment_id.tenant_id,
)) {
let refcount = nodes_timelines.entry(*ttid).or_default();
*refcount = refcount.checked_sub(1).unwrap();
}
}
(Detach, Vacant(_)) => {
info!("detachment is already known");
@@ -274,7 +325,7 @@ impl World {
let replaced = self.caught_up_count.insert(ttid, 0);
// only commit_lsn advancement makes timelines known to world
assert_eq!(None, replaced);
for (attachment, _) in self
for (attachment, node_id) in self
.attachments
.range(TenantShardAttachmentId::tenant_range(ttid.tenant_id))
{
@@ -284,6 +335,10 @@ impl World {
);
// only commit_lsn advancement makes timelines known to World
assert_eq!(None, replaced);
let nodes_timelines = self.nodes_timelines.entry(*node_id).or_default();
let refcount = nodes_timelines.entry(ttid).or_default();
*refcount = refcount.checked_add(1).unwrap();
}
}
}
@@ -294,7 +349,7 @@ impl World {
pub fn get_commit_lsn_advertisements(&self) -> HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> {
let mut commit_lsn_advertisements_by_node: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> =
Default::default();
HashMap::with_capacity(self.nodes_timelines.len());
let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v));
let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v));
@@ -329,7 +384,7 @@ impl World {
// DISTINCT node_id, array_agg(DISTINCT tenant_shard_id )
let for_node = commit_lsn_advertisements_by_node
.entry(node_id)
.or_default();
.or_insert_with(|| HashMap::with_capacity(self.nodes_timelines[&node_id].len()));
match for_node.entry(tenant_timeline_id) {
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(commit_lsn);

View File

@@ -176,3 +176,47 @@ fn quiescing_timeline_catchup() {
assert!(world.quiesced_timelines.contains_key(&ttid));
}
#[test]
fn nodes_timelines() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::from_array([0x1; 16]);
let ttid = TenantTimelineId {
tenant_id,
timeline_id,
};
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let ps_id = NodeId(0x100);
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: AttachmentUpdateAction::Attach { ps_id },
});
assert!(world.nodes_timelines.get(&ps_id).is_none());
world.handle_commit_lsn_advancement(ttid, Lsn(0x23));
assert_eq!(world.nodes_timelines[&ps_id].len(), 1);
let timeline2 = TimelineId::from_array([0x2; 16]);
world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv {
attachment: TimelineAttachmentId {
tenant_timeline_id: TenantTimelineId {
tenant_id,
timeline_id: timeline2,
},
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
},
remote_consistent_lsn: Lsn(0x42),
});
}