From eaa91291ae75d74ac902c9340ae1cf493615a6e5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 5 Jun 2025 01:09:33 +0200 Subject: [PATCH] add test case for initial advertisement and fix everything by switching to btrees and proper merge equi join --- libs/sk_ps_discovery/src/lib.rs | 176 ++++++++++++++++++++---------- libs/sk_ps_discovery/src/tests.rs | 118 ++++++++++++++++---- libs/utils/src/generation.rs | 4 +- libs/utils/src/lib.rs | 2 + libs/utils/src/merge_join.rs | 67 ++++++++++++ libs/utils/src/shard.rs | 22 +++- 6 files changed, 306 insertions(+), 83 deletions(-) create mode 100644 libs/utils/src/merge_join.rs diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 6868b3ce96..80244ea246 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -1,37 +1,42 @@ #[cfg(test)] mod tests; -use std::collections::{BTreeMap, HashMap, HashSet, hash_map}; +use std::{ + collections::{BTreeMap, HashMap, HashSet, btree_map, hash_map}, + ops::RangeInclusive, +}; use tracing::{info, warn}; use utils::{ generation::Generation, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, + merge_join, shard::ShardIndex, }; #[derive(Debug, Default)] pub struct World { - attachments: HashMap, + attachments: BTreeMap, quiesced_timelines: BTreeMap, - commit_lsns: HashMap, - remote_consistent_lsns: HashMap, + commit_lsns: BTreeMap, + remote_consistent_lsns: BTreeMap, } -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] pub struct TenantShardAttachmentId { pub tenant_id: TenantId, pub shard_id: ShardIndex, pub generation: Generation, } -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] pub struct TimelineAttachmentId { - pub tenant_shard_attachment_id: TenantShardAttachmentId, - pub timeline_id: TimelineId, + pub tenant_timeline_id: TenantTimelineId, + pub shard_id: ShardIndex, + pub generation: Generation, } pub struct AttachmentUpdate { @@ -60,7 +65,7 @@ impl World { let remote_consistent_lsn_timelines: HashSet = self .remote_consistent_lsns .keys() - .map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id()) + .map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id) .collect(); // quiesced \cap (commit_lsn \cup remote_consistent_lsns) #[rustfmt::skip] @@ -72,7 +77,7 @@ impl World { pub fn update_attachment(&mut self, upd: AttachmentUpdate) { self.check_invariants(); use AttachmentUpdateAction::*; - use hash_map::Entry::*; + use btree_map::Entry::*; let AttachmentUpdate { tenant_shard_attachment_id, action, @@ -115,7 +120,7 @@ impl World { } = adv; match self.remote_consistent_lsns.entry(attachment) { - hash_map::Entry::Occupied(mut occupied_entry) => { + btree_map::Entry::Occupied(mut occupied_entry) => { let current = occupied_entry.get_mut(); use std::cmp::Ordering::*; match (*current).cmp(&remote_consistent_lsn) { @@ -132,8 +137,8 @@ impl World { } } } - hash_map::Entry::Vacant(entry) => { - let ttid = attachment.tenant_timeline_id(); + btree_map::Entry::Vacant(entry) => { + let ttid = attachment.tenant_timeline_id; match self.quiesced_timelines.get(&ttid).cloned() { Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => { info!("ignoring no-op update for quiesced timeline"); @@ -155,7 +160,7 @@ impl World { pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, update: Lsn) { self.check_invariants(); match self.commit_lsns.entry(ttid) { - hash_map::Entry::Occupied(mut entry) => { + btree_map::Entry::Occupied(mut entry) => { let current = entry.get_mut(); use std::cmp::Ordering::*; match (*current).cmp(&update) { @@ -175,7 +180,7 @@ impl World { } } - hash_map::Entry::Vacant(entry) => { + btree_map::Entry::Vacant(entry) => { match self.quiesced_timelines.get(&ttid).cloned() { Some(quiesced_lsn) if quiesced_lsn == update => { info!("ignoring no-op update for quiesced timeline"); @@ -198,28 +203,47 @@ impl World { pub fn get_commit_lsn_advertisements(&self) -> HashMap> { let mut commit_lsn_advertisements_by_node: HashMap> = Default::default(); - for (timeline_attachment_id, remote_consistent_lsn) in - self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v)) - { - let tenant_timeline_id = timeline_attachment_id.tenant_timeline_id(); - if let Some(commit_lsn) = self.commit_lsns.get(&tenant_timeline_id).cloned() { - if commit_lsn > remote_consistent_lsn { - if let Some(node_id) = self - .attachments - .get(&timeline_attachment_id.tenant_shard_attachment_id) - { - let for_node = commit_lsn_advertisements_by_node - .entry(*node_id) - .or_default(); - match for_node.entry(tenant_timeline_id) { - hash_map::Entry::Vacant(vacant_entry) => { - vacant_entry.insert(commit_lsn); - } - hash_map::Entry::Occupied(occupied_entry) => { - assert_eq!(*occupied_entry.get(), commit_lsn); - } - } - } + let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v)); + let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v)); + + let join = merge_join::inner_equi_join_with_merge_strategy( + commit_lsns_iter, + attachments_iter, + |(tenant_timeline_id, _)| tenant_timeline_id.tenant_id, + |(shard_attachment_id, _)| shard_attachment_id.tenant_id, + ); + for (l, r) in join { + let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = l; + let (tenant_shard_attachment_id, node_id): (TenantShardAttachmentId, NodeId) = r; + + // TOOD three-way equi join + let timeline_attachment_id = + tenant_shard_attachment_id.timeline_attachment_id(tenant_timeline_id.timeline_id); + match self + .remote_consistent_lsns + .get(&timeline_attachment_id) + .cloned() + { + // TODO: can > ever happen? + Some(remote_consistent_lsn) if remote_consistent_lsn >= commit_lsn => { + // this timeline shard attachment is already caught up + continue; + } + Some(_) | None => { + // need to advertise + // -> fallthrough + } + }; + // DISTINCT node_id, array_agg(DISTINCT tenant_shard_id ) + let for_node = commit_lsn_advertisements_by_node + .entry(node_id) + .or_default(); + match for_node.entry(tenant_timeline_id) { + hash_map::Entry::Vacant(vacant_entry) => { + vacant_entry.insert(commit_lsn); + } + hash_map::Entry::Occupied(occupied_entry) => { + assert_eq!(*occupied_entry.get(), commit_lsn); } } } @@ -233,21 +257,19 @@ impl World { .expect("must call this function only on quiesced tenant_timeline_id"); let replaced = self.commit_lsns.insert(tenant_timeline_id, quiesced_lsn); assert_eq!(None, replaced); - let reconstruct_remote_consistent_lsn_entries = - self.attachments - .keys() - .cloned() - .map(|tenant_shard_attachment_id| { - ( - TimelineAttachmentId { - tenant_shard_attachment_id, - timeline_id: tenant_timeline_id.timeline_id, - }, - quiesced_lsn, - ) - }); - self.remote_consistent_lsns - .reserve(reconstruct_remote_consistent_lsn_entries.len()); + let reconstruct_remote_consistent_lsn_entries = self + .attachments + .range(TenantShardAttachmentId::tenant_range( + tenant_timeline_id.tenant_id, + )) + .map(|(k, _)| *k) + .map(|tenant_shard_attachment_id| { + ( + tenant_shard_attachment_id + .timeline_attachment_id(tenant_timeline_id.timeline_id), + quiesced_lsn, + ) + }); for (key, value) in reconstruct_remote_consistent_lsn_entries { let replaced = self.remote_consistent_lsns.insert(key, value); assert_eq!(None, replaced); @@ -256,20 +278,56 @@ impl World { } impl TimelineAttachmentId { - pub fn tenant_timeline_id(&self) -> TenantTimelineId { - TenantTimelineId { - tenant_id: self.tenant_shard_attachment_id.tenant_id, - timeline_id: self.timeline_id, + pub fn timeline_range(ttid: TenantTimelineId) -> RangeInclusive { + let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE; + let generation_range: RangeInclusive<_> = Generation::RANGE; + RangeInclusive::new( + TimelineAttachmentId { + tenant_timeline_id: ttid, + shard_id: *shard_index_range.start(), + generation: *generation_range.start(), + }, + TimelineAttachmentId { + tenant_timeline_id: ttid, + shard_id: *shard_index_range.end(), + generation: *generation_range.end(), + }, + ) + } + pub fn tenant_shard_attachment_id(self) -> TenantShardAttachmentId { + TenantShardAttachmentId { + tenant_id: self.tenant_timeline_id.tenant_id, + shard_id: self.shard_id, + generation: self.generation, } } } impl TenantShardAttachmentId { - #[cfg(test)] pub fn timeline_attachment_id(self, timeline_id: TimelineId) -> TimelineAttachmentId { TimelineAttachmentId { - tenant_shard_attachment_id: self, - timeline_id, + tenant_timeline_id: TenantTimelineId { + tenant_id: self.tenant_id, + timeline_id, + }, + shard_id: self.shard_id, + generation: self.generation, } } + pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive { + let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE; + let generation_range: RangeInclusive<_> = Generation::RANGE; + RangeInclusive::new( + Self { + tenant_id, + shard_id: *shard_index_range.start(), + generation: *generation_range.start(), + }, + Self { + tenant_id, + shard_id: *shard_index_range.end(), + generation: *generation_range.end(), + }, + ) + } } diff --git a/libs/sk_ps_discovery/src/tests.rs b/libs/sk_ps_discovery/src/tests.rs index 54a36cd4f8..d021e0133f 100644 --- a/libs/sk_ps_discovery/src/tests.rs +++ b/libs/sk_ps_discovery/src/tests.rs @@ -3,13 +3,25 @@ use utils::id::TenantId; use super::*; use crate::World; +#[track_caller] +fn validate_advertisements( + actual: HashMap>, + expect: Vec<(NodeId, Vec<(TenantTimelineId, Lsn)>)>, +) { + let expect: HashMap<_, _> = expect + .into_iter() + .map(|(node_id, innermap)| (node_id, innermap.into_iter().collect())) + .collect(); + assert_eq!(actual, expect); +} + #[test] fn basic() { let mut world = World::default(); - let tenant_id = TenantId::generate(); - let timeline_id = TimelineId::generate(); - let timeline2 = TimelineId::generate(); + let tenant_id = TenantId::from_array([0xff; 16]); + let timeline_id = TimelineId::from_array([1; 16]); + let timeline2 = TimelineId::from_array([2; 16]); let attachment1 = TenantShardAttachmentId { tenant_id, @@ -53,7 +65,19 @@ fn basic() { Lsn(0x66), ); // Advs should still be empty - assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default()); + validate_advertisements( + world.get_commit_lsn_advertisements(), + vec![( + ps1, + vec![( + TenantTimelineId { + tenant_id, + timeline_id: timeline2, + }, + Lsn(0x66), + )], + )], + ); // Ok, out of order part tested. Now Safekeeper learns about the attachments. @@ -67,19 +91,75 @@ fn basic() { ); dbg!(&world); // Now advertisements to attachment1 will be sent out, but attachment2 is still not known, so, no advertisements to it. - { - let mut advs = world.get_commit_lsn_advertisements(); - assert_eq!(advs.len(), 1); - let advs = advs.remove(&ps1).unwrap(); - assert_eq!(advs.len(), 1); - let (tenant_timeline_id, lsn) = advs.into_iter().next().unwrap(); - assert_eq!( - TenantTimelineId { - tenant_id, - timeline_id - }, - tenant_timeline_id - ); - assert_eq!(lsn, Lsn(0x55)); - } + validate_advertisements( + world.get_commit_lsn_advertisements(), + vec![( + ps1, + vec![( + TenantTimelineId { + tenant_id, + timeline_id, + }, + Lsn(0x55), + )], + )], + ); +} + +#[test] +fn advertisement_for_new_timeline() { + let mut world = World::default(); + + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + let ttid = TenantTimelineId { + tenant_id, + timeline_id, + }; + + let tenant_shard_attachment_id = TenantShardAttachmentId { + tenant_id, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(2), + }; + + let ps_id = NodeId(0x100); + + world.update_attachment(AttachmentUpdate { + tenant_shard_attachment_id, + action: AttachmentUpdateAction::Attach { ps_id }, + }); + world.handle_commit_lsn_advancement(ttid, Lsn(23)); + + let advs = world.get_commit_lsn_advertisements(); + validate_advertisements(advs, vec![(ps_id, vec![(ttid, Lsn(23))])]); +} + +#[test] +fn quiescing_timeline_catchup() { + let mut world = World::default(); + + let tenant_id = TenantId::generate(); + let timeline_id = TimelineId::generate(); + let ttid = TenantTimelineId { + tenant_id, + timeline_id, + }; + + let tenant_shard_attachment_id = TenantShardAttachmentId { + tenant_id, + shard_id: ShardIndex::unsharded(), + generation: Generation::Valid(2), + }; + + let ps_id = NodeId(0x100); + + world.update_attachment(AttachmentUpdate { + tenant_shard_attachment_id, + action: AttachmentUpdateAction::Attach { ps_id }, + }); + world.handle_commit_lsn_advancement(ttid, Lsn(23)); + + let advs = world.get_commit_lsn_advertisements(); + validate_advertisements(advs, vec![(ps_id, vec![(ttid, Lsn(23))])]); } diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs index b5e4a4644a..0f92d853a8 100644 --- a/libs/utils/src/generation.rs +++ b/libs/utils/src/generation.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{fmt::Debug, ops::RangeInclusive}; use serde::{Deserialize, Serialize}; @@ -25,7 +25,9 @@ pub enum Generation { /// scenarios where pageservers might otherwise issue conflicting writes to /// remote storage impl Generation { + pub const MIN: Self = Self::None; pub const MAX: Self = Self::Valid(u32::MAX); + pub const RANGE: RangeInclusive = RangeInclusive::new(Self::MIN, Self::MAX); /// Create a new Generation that represents a legacy key format with /// no generation suffix diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 206b8bbd8f..9d03cb0d05 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -95,6 +95,8 @@ pub mod guard_arc_swap; pub mod elapsed_accum; +pub mod merge_join; + #[cfg(target_os = "linux")] pub mod linux_socket_ioctl; diff --git a/libs/utils/src/merge_join.rs b/libs/utils/src/merge_join.rs new file mode 100644 index 0000000000..1ccef533aa --- /dev/null +++ b/libs/utils/src/merge_join.rs @@ -0,0 +1,67 @@ +pub fn inner_equi_join_with_merge_strategy( + l: L, + r: R, + key_l: FL, + key_r: FR, +) -> impl Iterator +where + L: Iterator, // + Sorted + R: Iterator, // + Sorted + FL: 'static + Fn(&LI) -> K, + FR: 'static + Fn(&RI) -> K, + LI: Copy, + RI: Copy, + K: PartialEq + Eq + Ord, +{ + let mut l = l.map(move |i| (i, key_l(&i))).peekable(); + let mut r = r.map(move |i| (i, key_r(&i))).peekable(); + std::iter::from_fn(move || { + loop { + match (l.peek(), r.peek()) { + (Some((_, lk)), Some((_, rk))) if lk < rk => { + drop(l.next()); + continue; + } + (Some((_, lk)), Some((_, rk))) if lk > rk => { + drop(r.next()); + continue; + } + (Some((lv, lk)), Some((_, rk))) => { + assert!(lk == rk); + let (rv, _) = r.next().unwrap(); + return Some((lv.clone(), rv)); + } + (None, None) | (None, Some(_)) | (Some(_), None) => return None, + } + } + }) +} + +#[cfg(test)] +mod tests { + + #[test] + fn basic() { + let l = vec![b"a", b"c"]; + let r = vec![b"aa", b"ad", b"ba", b"bb", b"ca", b"cb", b"cd", b"dd"]; + + let res: Vec<_> = super::inner_equi_join_with_merge_strategy( + l.into_iter(), + r.into_iter(), + |l| &l[0..1], + |r| &r[0..1], + ) + .collect(); + + assert_eq!( + res, + vec![ + (b"a", b"aa"), + (b"a", b"ad"), + (b"c", b"ca"), + (b"c", b"cb"), + (b"c", b"cd"), + ] + ); + } +} diff --git a/libs/utils/src/shard.rs b/libs/utils/src/shard.rs index c8c410a725..269af8c2a3 100644 --- a/libs/utils/src/shard.rs +++ b/libs/utils/src/shard.rs @@ -52,6 +52,7 @@ pub struct TenantShardId { impl ShardCount { pub const MAX: Self = Self(u8::MAX); pub const MIN: Self = Self(0); + pub const RANGE: RangeInclusive = RangeInclusive::new(Self::MIN, Self::MAX); /// The internal value of a ShardCount may be zero, which means "1 shard, but use /// legacy format for TenantShardId that excludes the shard suffix", also known @@ -85,7 +86,9 @@ impl ShardCount { } impl ShardNumber { + pub const MIN: Self = Self(0); pub const MAX: Self = Self(u8::MAX); + pub const RANGE: RangeInclusive = RangeInclusive::new(Self::MIN, Self::MAX); } impl TenantShardId { @@ -100,16 +103,17 @@ impl TenantShardId { /// The range of all TenantShardId that belong to a particular TenantId. This is useful when /// you have a BTreeMap of TenantShardId, and are querying by TenantId. pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive { + let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE; RangeInclusive::new( Self { tenant_id, - shard_number: ShardNumber(0), - shard_count: ShardCount(0), + shard_number: shard_index_range.start().shard_number, + shard_count: shard_index_range.start().shard_count, }, Self { tenant_id, - shard_number: ShardNumber::MAX, - shard_count: ShardCount::MAX, + shard_number: shard_index_range.end().shard_number, + shard_count: shard_index_range.end().shard_count, }, ) } @@ -241,6 +245,16 @@ impl From<[u8; 18]> for TenantShardId { } impl ShardIndex { + pub const MIN: Self = ShardIndex { + shard_number: ShardNumber::MIN, + shard_count: ShardCount::MIN, + }; + pub const MAX: Self = ShardIndex { + shard_number: ShardNumber::MAX, + shard_count: ShardCount::MAX, + }; + pub const RANGE: RangeInclusive = RangeInclusive::new(Self::MIN, Self::MAX); + pub fn new(number: ShardNumber, count: ShardCount) -> Self { Self { shard_number: number,