add test case for initial advertisement and fix everything by switching to btrees and proper merge equi join

This commit is contained in:
Christian Schwarz
2025-06-05 01:09:33 +02:00
parent fc9f38dd2d
commit eaa91291ae
6 changed files with 306 additions and 83 deletions

View File

@@ -1,37 +1,42 @@
#[cfg(test)]
mod tests;
use std::collections::{BTreeMap, HashMap, HashSet, hash_map};
use std::{
collections::{BTreeMap, HashMap, HashSet, btree_map, hash_map},
ops::RangeInclusive,
};
use tracing::{info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
merge_join,
shard::ShardIndex,
};
#[derive(Debug, Default)]
pub struct World {
attachments: HashMap<TenantShardAttachmentId, NodeId>,
attachments: BTreeMap<TenantShardAttachmentId, NodeId>,
quiesced_timelines: BTreeMap<TenantTimelineId, Lsn>,
commit_lsns: HashMap<TenantTimelineId, Lsn>,
remote_consistent_lsns: HashMap<TimelineAttachmentId, Lsn>,
commit_lsns: BTreeMap<TenantTimelineId, Lsn>,
remote_consistent_lsns: BTreeMap<TimelineAttachmentId, Lsn>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub struct TenantShardAttachmentId {
pub tenant_id: TenantId,
pub shard_id: ShardIndex,
pub generation: Generation,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub struct TimelineAttachmentId {
pub tenant_shard_attachment_id: TenantShardAttachmentId,
pub timeline_id: TimelineId,
pub tenant_timeline_id: TenantTimelineId,
pub shard_id: ShardIndex,
pub generation: Generation,
}
pub struct AttachmentUpdate {
@@ -60,7 +65,7 @@ impl World {
let remote_consistent_lsn_timelines: HashSet<TenantTimelineId> = self
.remote_consistent_lsns
.keys()
.map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id())
.map(|tlaid: &TimelineAttachmentId| tlaid.tenant_timeline_id)
.collect();
// quiesced \cap (commit_lsn \cup remote_consistent_lsns)
#[rustfmt::skip]
@@ -72,7 +77,7 @@ impl World {
pub fn update_attachment(&mut self, upd: AttachmentUpdate) {
self.check_invariants();
use AttachmentUpdateAction::*;
use hash_map::Entry::*;
use btree_map::Entry::*;
let AttachmentUpdate {
tenant_shard_attachment_id,
action,
@@ -115,7 +120,7 @@ impl World {
} = adv;
match self.remote_consistent_lsns.entry(attachment) {
hash_map::Entry::Occupied(mut occupied_entry) => {
btree_map::Entry::Occupied(mut occupied_entry) => {
let current = occupied_entry.get_mut();
use std::cmp::Ordering::*;
match (*current).cmp(&remote_consistent_lsn) {
@@ -132,8 +137,8 @@ impl World {
}
}
}
hash_map::Entry::Vacant(entry) => {
let ttid = attachment.tenant_timeline_id();
btree_map::Entry::Vacant(entry) => {
let ttid = attachment.tenant_timeline_id;
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == remote_consistent_lsn => {
info!("ignoring no-op update for quiesced timeline");
@@ -155,7 +160,7 @@ impl World {
pub fn handle_commit_lsn_advancement(&mut self, ttid: TenantTimelineId, update: Lsn) {
self.check_invariants();
match self.commit_lsns.entry(ttid) {
hash_map::Entry::Occupied(mut entry) => {
btree_map::Entry::Occupied(mut entry) => {
let current = entry.get_mut();
use std::cmp::Ordering::*;
match (*current).cmp(&update) {
@@ -175,7 +180,7 @@ impl World {
}
}
hash_map::Entry::Vacant(entry) => {
btree_map::Entry::Vacant(entry) => {
match self.quiesced_timelines.get(&ttid).cloned() {
Some(quiesced_lsn) if quiesced_lsn == update => {
info!("ignoring no-op update for quiesced timeline");
@@ -198,28 +203,47 @@ impl World {
pub fn get_commit_lsn_advertisements(&self) -> HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> {
let mut commit_lsn_advertisements_by_node: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>> =
Default::default();
for (timeline_attachment_id, remote_consistent_lsn) in
self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v))
{
let tenant_timeline_id = timeline_attachment_id.tenant_timeline_id();
if let Some(commit_lsn) = self.commit_lsns.get(&tenant_timeline_id).cloned() {
if commit_lsn > remote_consistent_lsn {
if let Some(node_id) = self
.attachments
.get(&timeline_attachment_id.tenant_shard_attachment_id)
{
let for_node = commit_lsn_advertisements_by_node
.entry(*node_id)
.or_default();
match for_node.entry(tenant_timeline_id) {
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(commit_lsn);
}
hash_map::Entry::Occupied(occupied_entry) => {
assert_eq!(*occupied_entry.get(), commit_lsn);
}
}
}
let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v));
let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v));
let join = merge_join::inner_equi_join_with_merge_strategy(
commit_lsns_iter,
attachments_iter,
|(tenant_timeline_id, _)| tenant_timeline_id.tenant_id,
|(shard_attachment_id, _)| shard_attachment_id.tenant_id,
);
for (l, r) in join {
let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = l;
let (tenant_shard_attachment_id, node_id): (TenantShardAttachmentId, NodeId) = r;
// TOOD three-way equi join
let timeline_attachment_id =
tenant_shard_attachment_id.timeline_attachment_id(tenant_timeline_id.timeline_id);
match self
.remote_consistent_lsns
.get(&timeline_attachment_id)
.cloned()
{
// TODO: can > ever happen?
Some(remote_consistent_lsn) if remote_consistent_lsn >= commit_lsn => {
// this timeline shard attachment is already caught up
continue;
}
Some(_) | None => {
// need to advertise
// -> fallthrough
}
};
// DISTINCT node_id, array_agg(DISTINCT tenant_shard_id )
let for_node = commit_lsn_advertisements_by_node
.entry(node_id)
.or_default();
match for_node.entry(tenant_timeline_id) {
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(commit_lsn);
}
hash_map::Entry::Occupied(occupied_entry) => {
assert_eq!(*occupied_entry.get(), commit_lsn);
}
}
}
@@ -233,21 +257,19 @@ impl World {
.expect("must call this function only on quiesced tenant_timeline_id");
let replaced = self.commit_lsns.insert(tenant_timeline_id, quiesced_lsn);
assert_eq!(None, replaced);
let reconstruct_remote_consistent_lsn_entries =
self.attachments
.keys()
.cloned()
.map(|tenant_shard_attachment_id| {
(
TimelineAttachmentId {
tenant_shard_attachment_id,
timeline_id: tenant_timeline_id.timeline_id,
},
quiesced_lsn,
)
});
self.remote_consistent_lsns
.reserve(reconstruct_remote_consistent_lsn_entries.len());
let reconstruct_remote_consistent_lsn_entries = self
.attachments
.range(TenantShardAttachmentId::tenant_range(
tenant_timeline_id.tenant_id,
))
.map(|(k, _)| *k)
.map(|tenant_shard_attachment_id| {
(
tenant_shard_attachment_id
.timeline_attachment_id(tenant_timeline_id.timeline_id),
quiesced_lsn,
)
});
for (key, value) in reconstruct_remote_consistent_lsn_entries {
let replaced = self.remote_consistent_lsns.insert(key, value);
assert_eq!(None, replaced);
@@ -256,20 +278,56 @@ impl World {
}
impl TimelineAttachmentId {
pub fn tenant_timeline_id(&self) -> TenantTimelineId {
TenantTimelineId {
tenant_id: self.tenant_shard_attachment_id.tenant_id,
timeline_id: self.timeline_id,
pub fn timeline_range(ttid: TenantTimelineId) -> RangeInclusive<Self> {
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
let generation_range: RangeInclusive<_> = Generation::RANGE;
RangeInclusive::new(
TimelineAttachmentId {
tenant_timeline_id: ttid,
shard_id: *shard_index_range.start(),
generation: *generation_range.start(),
},
TimelineAttachmentId {
tenant_timeline_id: ttid,
shard_id: *shard_index_range.end(),
generation: *generation_range.end(),
},
)
}
pub fn tenant_shard_attachment_id(self) -> TenantShardAttachmentId {
TenantShardAttachmentId {
tenant_id: self.tenant_timeline_id.tenant_id,
shard_id: self.shard_id,
generation: self.generation,
}
}
}
impl TenantShardAttachmentId {
#[cfg(test)]
pub fn timeline_attachment_id(self, timeline_id: TimelineId) -> TimelineAttachmentId {
TimelineAttachmentId {
tenant_shard_attachment_id: self,
timeline_id,
tenant_timeline_id: TenantTimelineId {
tenant_id: self.tenant_id,
timeline_id,
},
shard_id: self.shard_id,
generation: self.generation,
}
}
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
let generation_range: RangeInclusive<_> = Generation::RANGE;
RangeInclusive::new(
Self {
tenant_id,
shard_id: *shard_index_range.start(),
generation: *generation_range.start(),
},
Self {
tenant_id,
shard_id: *shard_index_range.end(),
generation: *generation_range.end(),
},
)
}
}

View File

@@ -3,13 +3,25 @@ use utils::id::TenantId;
use super::*;
use crate::World;
#[track_caller]
fn validate_advertisements(
actual: HashMap<NodeId, HashMap<TenantTimelineId, Lsn>>,
expect: Vec<(NodeId, Vec<(TenantTimelineId, Lsn)>)>,
) {
let expect: HashMap<_, _> = expect
.into_iter()
.map(|(node_id, innermap)| (node_id, innermap.into_iter().collect()))
.collect();
assert_eq!(actual, expect);
}
#[test]
fn basic() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let timeline2 = TimelineId::generate();
let tenant_id = TenantId::from_array([0xff; 16]);
let timeline_id = TimelineId::from_array([1; 16]);
let timeline2 = TimelineId::from_array([2; 16]);
let attachment1 = TenantShardAttachmentId {
tenant_id,
@@ -53,7 +65,19 @@ fn basic() {
Lsn(0x66),
);
// Advs should still be empty
assert_eq!(world.get_commit_lsn_advertisements(), HashMap::default());
validate_advertisements(
world.get_commit_lsn_advertisements(),
vec![(
ps1,
vec![(
TenantTimelineId {
tenant_id,
timeline_id: timeline2,
},
Lsn(0x66),
)],
)],
);
// Ok, out of order part tested. Now Safekeeper learns about the attachments.
@@ -67,19 +91,75 @@ fn basic() {
);
dbg!(&world);
// Now advertisements to attachment1 will be sent out, but attachment2 is still not known, so, no advertisements to it.
{
let mut advs = world.get_commit_lsn_advertisements();
assert_eq!(advs.len(), 1);
let advs = advs.remove(&ps1).unwrap();
assert_eq!(advs.len(), 1);
let (tenant_timeline_id, lsn) = advs.into_iter().next().unwrap();
assert_eq!(
TenantTimelineId {
tenant_id,
timeline_id
},
tenant_timeline_id
);
assert_eq!(lsn, Lsn(0x55));
}
validate_advertisements(
world.get_commit_lsn_advertisements(),
vec![(
ps1,
vec![(
TenantTimelineId {
tenant_id,
timeline_id,
},
Lsn(0x55),
)],
)],
);
}
#[test]
fn advertisement_for_new_timeline() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let ttid = TenantTimelineId {
tenant_id,
timeline_id,
};
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let ps_id = NodeId(0x100);
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: AttachmentUpdateAction::Attach { ps_id },
});
world.handle_commit_lsn_advancement(ttid, Lsn(23));
let advs = world.get_commit_lsn_advertisements();
validate_advertisements(advs, vec![(ps_id, vec![(ttid, Lsn(23))])]);
}
#[test]
fn quiescing_timeline_catchup() {
let mut world = World::default();
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let ttid = TenantTimelineId {
tenant_id,
timeline_id,
};
let tenant_shard_attachment_id = TenantShardAttachmentId {
tenant_id,
shard_id: ShardIndex::unsharded(),
generation: Generation::Valid(2),
};
let ps_id = NodeId(0x100);
world.update_attachment(AttachmentUpdate {
tenant_shard_attachment_id,
action: AttachmentUpdateAction::Attach { ps_id },
});
world.handle_commit_lsn_advancement(ttid, Lsn(23));
let advs = world.get_commit_lsn_advertisements();
validate_advertisements(advs, vec![(ps_id, vec![(ttid, Lsn(23))])]);
}

View File

@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{fmt::Debug, ops::RangeInclusive};
use serde::{Deserialize, Serialize};
@@ -25,7 +25,9 @@ pub enum Generation {
/// scenarios where pageservers might otherwise issue conflicting writes to
/// remote storage
impl Generation {
pub const MIN: Self = Self::None;
pub const MAX: Self = Self::Valid(u32::MAX);
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
/// Create a new Generation that represents a legacy key format with
/// no generation suffix

View File

@@ -95,6 +95,8 @@ pub mod guard_arc_swap;
pub mod elapsed_accum;
pub mod merge_join;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;

View File

@@ -0,0 +1,67 @@
pub fn inner_equi_join_with_merge_strategy<L, LI, R, RI, K, FL, FR>(
l: L,
r: R,
key_l: FL,
key_r: FR,
) -> impl Iterator<Item = (LI, RI)>
where
L: Iterator<Item = LI>, // + Sorted
R: Iterator<Item = RI>, // + Sorted
FL: 'static + Fn(&LI) -> K,
FR: 'static + Fn(&RI) -> K,
LI: Copy,
RI: Copy,
K: PartialEq + Eq + Ord,
{
let mut l = l.map(move |i| (i, key_l(&i))).peekable();
let mut r = r.map(move |i| (i, key_r(&i))).peekable();
std::iter::from_fn(move || {
loop {
match (l.peek(), r.peek()) {
(Some((_, lk)), Some((_, rk))) if lk < rk => {
drop(l.next());
continue;
}
(Some((_, lk)), Some((_, rk))) if lk > rk => {
drop(r.next());
continue;
}
(Some((lv, lk)), Some((_, rk))) => {
assert!(lk == rk);
let (rv, _) = r.next().unwrap();
return Some((lv.clone(), rv));
}
(None, None) | (None, Some(_)) | (Some(_), None) => return None,
}
}
})
}
#[cfg(test)]
mod tests {
#[test]
fn basic() {
let l = vec![b"a", b"c"];
let r = vec![b"aa", b"ad", b"ba", b"bb", b"ca", b"cb", b"cd", b"dd"];
let res: Vec<_> = super::inner_equi_join_with_merge_strategy(
l.into_iter(),
r.into_iter(),
|l| &l[0..1],
|r| &r[0..1],
)
.collect();
assert_eq!(
res,
vec![
(b"a", b"aa"),
(b"a", b"ad"),
(b"c", b"ca"),
(b"c", b"cb"),
(b"c", b"cd"),
]
);
}
}

View File

@@ -52,6 +52,7 @@ pub struct TenantShardId {
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
pub const MIN: Self = Self(0);
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
@@ -85,7 +86,9 @@ impl ShardCount {
}
impl ShardNumber {
pub const MIN: Self = Self(0);
pub const MAX: Self = Self(u8::MAX);
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
}
impl TenantShardId {
@@ -100,16 +103,17 @@ impl TenantShardId {
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
let shard_index_range: RangeInclusive<_> = ShardIndex::RANGE;
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
shard_number: shard_index_range.start().shard_number,
shard_count: shard_index_range.start().shard_count,
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
shard_number: shard_index_range.end().shard_number,
shard_count: shard_index_range.end().shard_count,
},
)
}
@@ -241,6 +245,16 @@ impl From<[u8; 18]> for TenantShardId {
}
impl ShardIndex {
pub const MIN: Self = ShardIndex {
shard_number: ShardNumber::MIN,
shard_count: ShardCount::MIN,
};
pub const MAX: Self = ShardIndex {
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
};
pub const RANGE: RangeInclusive<Self> = RangeInclusive::new(Self::MIN, Self::MAX);
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,