diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 2316acb616..396c801606 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -63,16 +63,84 @@ impl KeySpace { KeyPartitioning { parts } } + /// Update the keyspace such that it doesn't contain any range + /// that is overlapping with `other`. This can involve splitting or + /// removing of existing ranges. + pub fn remove_overlapping_with(&mut self, other: &KeySpace) { + let (self_start, self_end) = match (self.start(), self.end()) { + (Some(start), Some(end)) => (start, end), + _ => { + // self is empty + return; + } + }; + + // Key spaces are sorted by definition, so skip ahead to the first + // potentially intersecting range. Similarly, ignore ranges that start + // after the current keyspace ends. + let other_ranges = other + .ranges + .iter() + .skip_while(|range| self_start >= range.end) + .take_while(|range| self_end > range.start); + + for range in other_ranges { + while let Some(overlap_at) = self.overlaps_at(range) { + let overlapped = self.ranges[overlap_at].clone(); + + if overlapped.start < range.start && overlapped.end <= range.end { + // Higher part of the range is completely overlapped. + self.ranges[overlap_at].end = range.start; + } + if overlapped.start >= range.start && overlapped.end > range.end { + // Lower part of the range is completely overlapped. + self.ranges[overlap_at].start = range.end; + } + if overlapped.start < range.start && overlapped.end > range.end { + // Middle part of the range is overlapped. + self.ranges[overlap_at].end = range.start; + self.ranges + .insert(overlap_at + 1, range.end..overlapped.end); + } + if overlapped.start >= range.start && overlapped.end <= range.end { + // Whole range is overlapped + self.ranges.remove(overlap_at); + } + } + } + } + + pub fn start(&self) -> Option { + self.ranges.first().map(|range| range.start) + } + + pub fn end(&self) -> Option { + self.ranges.last().map(|range| range.end) + } + + #[allow(unused)] + pub fn total_size(&self) -> usize { + self.ranges + .iter() + .map(|range| key_range_size(range) as usize) + .sum() + } + + fn overlaps_at(&self, range: &Range) -> Option { + match self.ranges.binary_search_by_key(&range.end, |r| r.start) { + Ok(0) => None, + Err(0) => None, + Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1), + Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1), + _ => None, + } + } + /// /// Check if key space contains overlapping range /// pub fn overlaps(&self, range: &Range) -> bool { - match self.ranges.binary_search_by_key(&range.end, |r| r.start) { - Ok(0) => false, - Err(0) => false, - Ok(index) => self.ranges[index - 1].end > range.start, - Err(index) => self.ranges[index - 1].end > range.start, - } + self.overlaps_at(range).is_some() } } @@ -441,4 +509,118 @@ mod tests { // xxxxxxxxxxx assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently! } + + #[test] + fn test_remove_full_overlapps() { + let mut key_space1 = KeySpace { + ranges: vec![ + Key::from_i128(1)..Key::from_i128(4), + Key::from_i128(5)..Key::from_i128(8), + Key::from_i128(10)..Key::from_i128(12), + ], + }; + let key_space2 = KeySpace { + ranges: vec![ + Key::from_i128(2)..Key::from_i128(3), + Key::from_i128(6)..Key::from_i128(7), + Key::from_i128(11)..Key::from_i128(13), + ], + }; + key_space1.remove_overlapping_with(&key_space2); + assert_eq!( + key_space1.ranges, + vec![ + Key::from_i128(1)..Key::from_i128(2), + Key::from_i128(3)..Key::from_i128(4), + Key::from_i128(5)..Key::from_i128(6), + Key::from_i128(7)..Key::from_i128(8), + Key::from_i128(10)..Key::from_i128(11) + ] + ); + } + + #[test] + fn test_remove_partial_overlaps() { + // Test partial ovelaps + let mut key_space1 = KeySpace { + ranges: vec![ + Key::from_i128(1)..Key::from_i128(5), + Key::from_i128(7)..Key::from_i128(10), + Key::from_i128(12)..Key::from_i128(15), + ], + }; + let key_space2 = KeySpace { + ranges: vec![ + Key::from_i128(3)..Key::from_i128(6), + Key::from_i128(8)..Key::from_i128(11), + Key::from_i128(14)..Key::from_i128(17), + ], + }; + key_space1.remove_overlapping_with(&key_space2); + assert_eq!( + key_space1.ranges, + vec![ + Key::from_i128(1)..Key::from_i128(3), + Key::from_i128(7)..Key::from_i128(8), + Key::from_i128(12)..Key::from_i128(14), + ] + ); + } + + #[test] + fn test_remove_no_overlaps() { + let mut key_space1 = KeySpace { + ranges: vec![ + Key::from_i128(1)..Key::from_i128(5), + Key::from_i128(7)..Key::from_i128(10), + Key::from_i128(12)..Key::from_i128(15), + ], + }; + let key_space2 = KeySpace { + ranges: vec![ + Key::from_i128(6)..Key::from_i128(7), + Key::from_i128(11)..Key::from_i128(12), + Key::from_i128(15)..Key::from_i128(17), + ], + }; + key_space1.remove_overlapping_with(&key_space2); + assert_eq!( + key_space1.ranges, + vec![ + Key::from_i128(1)..Key::from_i128(5), + Key::from_i128(7)..Key::from_i128(10), + Key::from_i128(12)..Key::from_i128(15), + ] + ); + } + + #[test] + fn test_remove_one_range_overlaps_multiple() { + let mut key_space1 = KeySpace { + ranges: vec![ + Key::from_i128(1)..Key::from_i128(3), + Key::from_i128(3)..Key::from_i128(6), + Key::from_i128(6)..Key::from_i128(10), + Key::from_i128(12)..Key::from_i128(15), + Key::from_i128(17)..Key::from_i128(20), + Key::from_i128(20)..Key::from_i128(30), + Key::from_i128(30)..Key::from_i128(40), + ], + }; + let key_space2 = KeySpace { + ranges: vec![Key::from_i128(9)..Key::from_i128(19)], + }; + key_space1.remove_overlapping_with(&key_space2); + assert_eq!( + key_space1.ranges, + vec![ + Key::from_i128(1)..Key::from_i128(3), + Key::from_i128(3)..Key::from_i128(6), + Key::from_i128(6)..Key::from_i128(9), + Key::from_i128(19)..Key::from_i128(20), + Key::from_i128(20)..Key::from_i128(30), + Key::from_i128(30)..Key::from_i128(40), + ] + ); + } }