From 2cea7a7838c4ffbd1e28cbcab5de3ebffde531f9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 4 Jun 2025 19:19:20 +0200 Subject: [PATCH] sketch offloading --- libs/sk_ps_discovery/benches/bench.rs | 71 +++++++++++++++------------ libs/sk_ps_discovery/src/lib.rs | 48 +++++++++++++++++- libs/sk_ps_discovery/src/storage.rs | 15 ++++++ 3 files changed, 102 insertions(+), 32 deletions(-) create mode 100644 libs/sk_ps_discovery/src/storage.rs diff --git a/libs/sk_ps_discovery/benches/bench.rs b/libs/sk_ps_discovery/benches/bench.rs index 898026d257..35e5a3ec63 100644 --- a/libs/sk_ps_discovery/benches/bench.rs +++ b/libs/sk_ps_discovery/benches/bench.rs @@ -2,14 +2,17 @@ use std::time::Instant; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{Criterion, criterion_group, criterion_main}; use hex::FromHex; use pprof::criterion::{Output, PProfProfiler}; use sk_ps_discovery::{ AttachmentUpdate, RemoteConsistentLsnAdv, TenantShardAttachmentId, TimelineAttachmentId, }; use utils::{ - generation::Generation, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, shard::ShardIndex + generation::Generation, + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + lsn::Lsn, + shard::ShardIndex, }; /// Use jemalloc and enable profiling, to mirror bin/safekeeper.rs. @@ -35,35 +38,41 @@ fn bench_simple(c: &mut Criterion) { let mut world = sk_ps_discovery::World::default(); // Simplified view: lots of tenants with one timeline each - let n_tenants = 400_000; - for t in 1..=n_tenants { - let ps_id = NodeId(23); - let tenant_id = TenantId::generate(); - let timeline_id = TimelineId::generate(); - let tenant_shard_attachment_id = TenantShardAttachmentId { - tenant_id, - shard_id: ShardIndex::unsharded(), - generation: Generation::Valid(0), - }; - let timeline_attachment = TimelineAttachmentId { - tenant_shard_attachment_id, - timeline_id, - }; - world.update_attachment(AttachmentUpdate { - tenant_shard_attachment_id, - action: sk_ps_discovery::AttachmentUpdateAction::Attach { ps_id }, - }); - world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv { - remote_consistent_lsn: Lsn(23), - attachment: timeline_attachment, - }); - world.handle_commit_lsn_advancement( - TenantTimelineId { - tenant_id, - timeline_id, - }, - Lsn(42), - ); + let n_pageservers = 20; + let n_tenant_shards_per_pageserver = 2000; + for ps_id in 1..=n_pageservers { + for t in 1..=n_tenant_shards_per_pageserver { + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + for generation in 10..=11 { + let tenant_shard_attachment_id = TenantShardAttachmentId { + tenant_id, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(generation), + }; + let timeline_attachment = TimelineAttachmentId { + tenant_shard_attachment_id, + timeline_id, + }; + world.update_attachment(AttachmentUpdate { + tenant_shard_attachment_id, + action: sk_ps_discovery::AttachmentUpdateAction::Attach { + ps_id: NodeId(ps_id), + }, + }); + world.handle_remote_consistent_lsn_advertisement(RemoteConsistentLsnAdv { + remote_consistent_lsn: Lsn(23), + attachment: timeline_attachment, + }); + } + world.handle_commit_lsn_advancement( + TenantTimelineId { + tenant_id, + timeline_id, + }, + Lsn(42), + ); + } } // setup done diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 4e7359d37f..c0debfae09 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -1,7 +1,9 @@ #[cfg(test)] mod tests; -use std::collections::{HashMap, hash_map}; +mod storage; + +use std::collections::{HashMap, HashSet, hash_map}; use tracing::{info, warn}; use utils::{ @@ -16,6 +18,8 @@ pub struct World { attachments: HashMap, commit_lsns: HashMap, remote_consistent_lsns: HashMap, + paged_out: HashSet, + storage: Box, } #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] @@ -77,6 +81,7 @@ impl World { attachment, remote_consistent_lsn, } = adv; + self.page_in_ttid(attachment.tenant_timeline_id()); match self.remote_consistent_lsns.entry(attachment) { hash_map::Entry::Occupied(mut occupied_entry) => { let current = occupied_entry.get_mut(); @@ -95,6 +100,7 @@ impl World { } } pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, commit_lsn: Lsn) { + self.page_in_ttid(ttid); match self.commit_lsns.entry(ttid) { hash_map::Entry::Occupied(mut occupied_entry) => { assert!(*occupied_entry.get() <= commit_lsn); @@ -137,6 +143,46 @@ impl World { } commit_lsn_advertisements_by_node } + + fn page_in_ttid(&mut self, ttid: TenantTimelineId) { + if !self.paged_out.remove(&ttid) { + return; + }; + // below is infallible; if we ever make it fallible, need to + // rollback the removal from paged_out in case we bail with error + let storage::Timeline { + commit_lsns, + remote_consistent_lsns, + } = self.storage.get_timeline(ttid); + for (tenant_timeline_id, commit_lsn) in commit_lsns { + match self.commit_lsns.entry(tenant_timeline_id) { + hash_map::Entry::Occupied(occupied_entry) => { + panic!( + "inconsistent: entry is supposed to be paged_out:\ninmem={:?}\nondisk={:?}", + occupied_entry.get(), + (tenant_timeline_id, commit_lsn) + ) + } + hash_map::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(commit_lsn); + } + } + } + for (timeline_attachment_id, remote_consistent_lsn) in remote_consistent_lsns { + match self.remote_consistent_lsns.entry(timeline_attachment_id) { + hash_map::Entry::Occupied(occupied_entry) => { + panic!( + "inconsistent: entry is supposed to be paged_out:\ninmem={:?}\nondisk={:?}", + occupied_entry.get(), + (timeline_attachment_id, remote_consistent_lsn) + ) + } + hash_map::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(remote_consistent_lsn); + } + } + } + } } impl TimelineAttachmentId { diff --git a/libs/sk_ps_discovery/src/storage.rs b/libs/sk_ps_discovery/src/storage.rs new file mode 100644 index 0000000000..f19ef9f471 --- /dev/null +++ b/libs/sk_ps_discovery/src/storage.rs @@ -0,0 +1,15 @@ +use std::collections::HashMap; + +use utils::id::TenantTimelineId; + +use crate::TimelineAttachmentId; + +pub trait Storage { + fn get_timeline(&self, ttid: TenantTimelineId) -> Timeline; + fn store_timeline(&self, ttid: TenantTimelineId, timeline: Timeline); +} + +pub struct Timeline { + pub commit_lsns: HashMap, + pub remote_consistent_lsns: HashMap, +}