left equi join for remote_consistent_lsn retrieval

This commit is contained in:
Christian Schwarz
2025-06-06 14:28:20 -07:00
parent e40b1c79fa
commit 5f7bc3ce60
2 changed files with 110 additions and 14 deletions

View File

@@ -352,6 +352,7 @@ impl World {
HashMap::with_capacity(self.nodes_timelines.len());
let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v));
let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v));
let remote_consistent_lsns_iter = self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v));
let join = merge_join::inner_equi_join_with_merge_strategy(
commit_lsns_iter,
@@ -359,20 +360,18 @@ impl World {
|(tenant_timeline_id, _)| tenant_timeline_id.tenant_id,
|(shard_attachment_id, _)| shard_attachment_id.tenant_id,
);
for (l, r) in join {
let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = l;
let (tenant_shard_attachment_id, node_id): (TenantShardAttachmentId, NodeId) = r;
// TOOD three-way equi join
let timeline_attachment_id =
tenant_shard_attachment_id.timeline_attachment_id(tenant_timeline_id.timeline_id);
match self
.remote_consistent_lsns
.get(&timeline_attachment_id)
.cloned()
{
let join = merge_join::left_equi_join_with_merge_strategy(
join,
remote_consistent_lsns_iter,
|((ttid, _), _)| ttid.tenant_id,
|(tlaid, _)| tlaid.tenant_timeline_id.tenant_id,
);
for ((c, a), r) in join {
let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = c;
let (_, node_id): (TenantShardAttachmentId, NodeId) = a;
match r {
// TODO: can > ever happen?
Some(remote_consistent_lsn) if remote_consistent_lsn >= commit_lsn => {
Some((_, remote_consistent_lsn)) if remote_consistent_lsn >= commit_lsn => {
// this timeline shard attachment is already caught up
continue;
}

View File

@@ -37,11 +37,65 @@ where
})
}
pub fn left_equi_join_with_merge_strategy<L, LI, R, RI, K, FL, FR>(
l: L,
r: R,
key_l: FL,
key_r: FR,
) -> impl Iterator<Item = (LI, Option<RI>)>
where
L: Iterator<Item = LI>, // + Sorted
R: Iterator<Item = RI>, // + Sorted
FL: 'static + Fn(&LI) -> K,
FR: 'static + Fn(&RI) -> K,
LI: Copy,
RI: Copy,
K: PartialEq + Eq + Ord,
{
let mut l = l.map(move |i| (i, key_l(&i))).peekable();
let mut r = r.map(move |i| (i, key_r(&i))).peekable();
let mut l_had_match = false;
std::iter::from_fn(move || {
loop {
match (l.peek(), r.peek()) {
(Some((_, lk)), Some((_, rk))) if lk < rk => {
let (lv, _) = l.next().unwrap();
if l_had_match {
l_had_match = false;
continue;
} else {
return Some((lv, None));
}
}
(Some((_, _)), None) => {
let (lv, _) = l.next().unwrap();
if l_had_match {
l_had_match = false;
continue;
} else {
return Some((lv, None));
}
}
(Some((_, lk)), Some((_, rk))) if lk > rk => {
drop(r.next());
continue;
}
(Some((lv, lk)), Some((_, rk))) => {
l_had_match = true;
assert!(lk == rk);
let (rv, _) = r.next().unwrap();
return Some((lv.clone(), Some(rv)));
}
(None, None) | (None, Some(_)) => return None,
}
}
})
}
#[cfg(test)]
mod tests {
#[test]
fn basic() {
fn inner_equi_basic() {
let l = vec![b"a", b"c"];
let r = vec![b"aa", b"ad", b"ba", b"bb", b"ca", b"cb", b"cd", b"dd"];
@@ -64,4 +118,47 @@ mod tests {
]
);
}
#[test]
fn left_equi_basic() {
/*
create table aleft (id text, aleft text);
create table aright (id text, aright text);
insert into aleft values ('a', 'a'), ('b', 'b');
insert into aright values ('a', 'aa'), ('a', 'ab'), ('c', 'cd');
select * from aleft left join aright using ("id");
*/
let l = vec![b"a", b"b"];
let r = vec![b"aa", b"ab", b"cd"];
let res: Vec<_> = super::left_equi_join_with_merge_strategy(
l.into_iter(),
r.into_iter(),
|l| &l[0..1],
|r| &r[0..1],
)
.collect();
assert_eq!(
res,
vec![(b"a", Some(b"aa")), (b"a", Some(b"ab")), (b"b", None)]
);
}
#[test]
fn left_equi_basic_2() {
let l = vec![b"b"];
let r = vec![b"aa", b"ab", b"bb"];
let res: Vec<_> = super::left_equi_join_with_merge_strategy(
l.into_iter(),
r.into_iter(),
|l| &l[0..1],
|r| &r[0..1],
)
.collect();
assert_eq!(res, vec![(b"b", Some(b"bb"))])
}
}