diff --git a/libs/sk_ps_discovery/src/lib.rs b/libs/sk_ps_discovery/src/lib.rs index 12bc6c738d..c4c376705e 100644 --- a/libs/sk_ps_discovery/src/lib.rs +++ b/libs/sk_ps_discovery/src/lib.rs @@ -352,6 +352,7 @@ impl World { HashMap::with_capacity(self.nodes_timelines.len()); let commit_lsns_iter = self.commit_lsns.iter().map(|(k, v)| (*k, *v)); let attachments_iter = self.attachments.iter().map(|(k, v)| (*k, *v)); + let remote_consistent_lsns_iter = self.remote_consistent_lsns.iter().map(|(k, v)| (*k, *v)); let join = merge_join::inner_equi_join_with_merge_strategy( commit_lsns_iter, @@ -359,20 +360,18 @@ impl World { |(tenant_timeline_id, _)| tenant_timeline_id.tenant_id, |(shard_attachment_id, _)| shard_attachment_id.tenant_id, ); - for (l, r) in join { - let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = l; - let (tenant_shard_attachment_id, node_id): (TenantShardAttachmentId, NodeId) = r; - - // TOOD three-way equi join - let timeline_attachment_id = - tenant_shard_attachment_id.timeline_attachment_id(tenant_timeline_id.timeline_id); - match self - .remote_consistent_lsns - .get(&timeline_attachment_id) - .cloned() - { + let join = merge_join::left_equi_join_with_merge_strategy( + join, + remote_consistent_lsns_iter, + |((ttid, _), _)| ttid.tenant_id, + |(tlaid, _)| tlaid.tenant_timeline_id.tenant_id, + ); + for ((c, a), r) in join { + let (tenant_timeline_id, commit_lsn): (TenantTimelineId, Lsn) = c; + let (_, node_id): (TenantShardAttachmentId, NodeId) = a; + match r { // TODO: can > ever happen? - Some(remote_consistent_lsn) if remote_consistent_lsn >= commit_lsn => { + Some((_, remote_consistent_lsn)) if remote_consistent_lsn >= commit_lsn => { // this timeline shard attachment is already caught up continue; } diff --git a/libs/utils/src/merge_join.rs b/libs/utils/src/merge_join.rs index 1ccef533aa..cfb6d0bf3f 100644 --- a/libs/utils/src/merge_join.rs +++ b/libs/utils/src/merge_join.rs @@ -37,11 +37,65 @@ where }) } +pub fn left_equi_join_with_merge_strategy( + l: L, + r: R, + key_l: FL, + key_r: FR, +) -> impl Iterator)> +where + L: Iterator, // + Sorted + R: Iterator, // + Sorted + FL: 'static + Fn(&LI) -> K, + FR: 'static + Fn(&RI) -> K, + LI: Copy, + RI: Copy, + K: PartialEq + Eq + Ord, +{ + let mut l = l.map(move |i| (i, key_l(&i))).peekable(); + let mut r = r.map(move |i| (i, key_r(&i))).peekable(); + let mut l_had_match = false; + std::iter::from_fn(move || { + loop { + match (l.peek(), r.peek()) { + (Some((_, lk)), Some((_, rk))) if lk < rk => { + let (lv, _) = l.next().unwrap(); + if l_had_match { + l_had_match = false; + continue; + } else { + return Some((lv, None)); + } + } + (Some((_, _)), None) => { + let (lv, _) = l.next().unwrap(); + if l_had_match { + l_had_match = false; + continue; + } else { + return Some((lv, None)); + } + } + (Some((_, lk)), Some((_, rk))) if lk > rk => { + drop(r.next()); + continue; + } + (Some((lv, lk)), Some((_, rk))) => { + l_had_match = true; + assert!(lk == rk); + let (rv, _) = r.next().unwrap(); + return Some((lv.clone(), Some(rv))); + } + (None, None) | (None, Some(_)) => return None, + } + } + }) +} #[cfg(test)] mod tests { #[test] - fn basic() { + fn inner_equi_basic() { let l = vec![b"a", b"c"]; let r = vec![b"aa", b"ad", b"ba", b"bb", b"ca", b"cb", b"cd", b"dd"]; @@ -64,4 +118,47 @@ mod tests { ] ); } + + #[test] + fn left_equi_basic() { + /* + create table aleft (id text, aleft text); + create table aright (id text, aright text); + insert into aleft values ('a', 'a'), ('b', 'b'); + insert into aright values ('a', 'aa'), ('a', 'ab'), ('c', 'cd'); + select * from aleft left join aright using ("id"); + */ + + let l = vec![b"a", b"b"]; + let r = vec![b"aa", b"ab", b"cd"]; + + let res: Vec<_> = super::left_equi_join_with_merge_strategy( + l.into_iter(), + r.into_iter(), + |l| &l[0..1], + |r| &r[0..1], + ) + .collect(); + + assert_eq!( + res, + vec![(b"a", Some(b"aa")), (b"a", Some(b"ab")), (b"b", None)] + ); + } + + #[test] + fn left_equi_basic_2() { + let l = vec![b"b"]; + let r = vec![b"aa", b"ab", b"bb"]; + + let res: Vec<_> = super::left_equi_join_with_merge_strategy( + l.into_iter(), + r.into_iter(), + |l| &l[0..1], + |r| &r[0..1], + ) + .collect(); + + assert_eq!(res, vec![(b"b", Some(b"bb"))]) + } }