diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index e6162e45d0..8f5055a073 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -48,48 +48,89 @@ impl<'a> ShardedRange<'a> { } } - /// Break up this range into chunks, each of which has at least one local key in it. - pub fn fragment(self, target_nblocks: usize) -> Vec<(u32, Range)> { - let shard_identity = self.shard_identity; - let mut range = self; - let mut result = Vec::new(); - loop { - // Split off the first target_nblocks if the remainder of the range would still contain - // some local blocks, otherwise yield the remainder of the range and we're done. - let range_size = range.page_count(); - - if range_size == u32::MAX || range_size == 0 { - return vec![(range_size, range.range)]; - } - - if range_size > target_nblocks as u32 { - // FIXME: this add is not advancing far enough to capture target_nblocks *local* - // blocks. So we will end up chunking our range more finely than we needed to. - let remainder = Self::new( - range.range.start.add(target_nblocks as u32)..range.range.end, - shard_identity, - ); - - let remainder_blocks = remainder.page_count(); - if remainder_blocks > 0 { - // We may split the range here - let mut split_off = range; - split_off.range.end = remainder.range.start; - - result.push((split_off.page_count(), split_off.range)); - - range = remainder; + /// Break up this range into chunks, each of which has at least one local key in it if the + /// total range has at least one local key. + pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range)> { + // Optimization for single-key case (e.g. logical size keys) + if self.range.end == self.range.start.add(1) { + return vec![( + if self.shard_identity.is_key_disposable(&self.range.start) { + 0 } else { - // We may not split because the remainder would contain no local blocks - result.push((range_size, range.range)); - break; + 1 + }, + self.range, + )]; + } + + if self.range.end.field1 != self.range.start.field1 + || self.range.end.field2 != self.range.start.field2 + || self.range.end.field3 != self.range.start.field3 + || self.range.end.field4 != self.range.start.field4 + { + // Ranges that span relations are not fragmented. We only get these ranges as a result + // of operations that act on existing layers, so we trust that the existing range is + // reasonably small. + return vec![(u32::MAX, self.range)]; + } + + let mut fragments: Vec<(u32, Range)> = Vec::new(); + + let mut cursor = self.range.start; + while cursor < self.range.end { + let advance_by = self.advance_to_next_boundary(cursor); + let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor); + + // If the previous fragment is undersized, then we seek to consume enough + // blocks to complete it. + let (want_blocks, merge_last_fragment) = match fragments.last_mut() { + Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)), + Some(frag) => { + // Prev block is complete, want the full number. + ( + target_nblocks, + if is_fragment_disposable { + // If this current range will be empty (not shard-local data), we will merge into previous + Some(frag) + } else { + None + }, + ) } + None => { + // First iteration, want the full number + (target_nblocks, None) + } + }; + + let advance_by = if is_fragment_disposable { + advance_by } else { - result.push((range_size, range.range)); - break; + std::cmp::min(advance_by, want_blocks) + }; + + let next_cursor = cursor.add(advance_by); + + let this_frag = ( + if is_fragment_disposable { + 0 + } else { + advance_by + }, + cursor..next_cursor, + ); + cursor = next_cursor; + + if let Some(last_fragment) = merge_last_fragment { + // Previous fragment was short or this one is empty, merge into it + last_fragment.0 += this_frag.0; + last_fragment.1.end = this_frag.1.end; + } else { + fragments.push(this_frag); } } - result + + fragments } /// Estimate the physical pages that are within this range, on this shard. This returns @@ -115,27 +156,16 @@ impl<'a> ShardedRange<'a> { // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs // to Self, and add the stripe's block count to our total if so. let mut result: u64 = 0; - let mut stripe_start = self.range.start; - while stripe_start < self.range.end { - let is_key_disposable = self.shard_identity.is_key_disposable(&stripe_start); - - // Count up to the next stripe_size boundary - let stripe_index = stripe_start.field6 / self.shard_identity.stripe_size.0; - let stripe_remainder = self.shard_identity.stripe_size.0 - - (stripe_start.field6 - stripe_index * self.shard_identity.stripe_size.0); - - let next_stripe_start = stripe_start.add(stripe_remainder); - let stripe_end = std::cmp::min(self.range.end, next_stripe_start); + let mut cursor = self.range.start; + while cursor < self.range.end { + // Count up to the next stripe_size boundary or end of range + let advance_by = self.advance_to_next_boundary(cursor); + cursor = cursor.add(advance_by); // If this blocks in this stripe belong to us, add them to our count - if !is_key_disposable { - // Keys must be nearby because we earlier used raw_size and would have - // droped out with u32::MAX if they were distant. - let diff = nearby_key_delta(&stripe_start, &stripe_end); - result += diff; + if !self.shard_identity.is_key_disposable(&cursor) { + result += advance_by as u64; } - - stripe_start = next_stripe_start; } // Sharding should always decrease the number of pages we estimate, never increase it @@ -148,6 +178,23 @@ impl<'a> ShardedRange<'a> { } } + /// Advance the cursor to the next potential fragment boundary: this is either + /// a stripe boundary, or the end of the range. + fn advance_to_next_boundary(&self, cursor: Key) -> u32 { + let distance_to_range_end = nearby_key_delta(&cursor, &self.range.end); + + if self.shard_identity.count < ShardCount::new(2) { + // Optimization: don't bother stepping through stripes if the tenant isn't sharded. + return Self::raw_size(&self.range); + } + + let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0; + let stripe_remainder = self.shard_identity.stripe_size.0 + - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0); + + std::cmp::min(stripe_remainder as u64, distance_to_range_end) as u32 + } + /// Whereas `page_count` estimates the number of pages physically in this range on this shard, /// this function simply calculates the number of pages in the space, without accounting for those /// pages that would not actually be stored on this node. @@ -182,7 +229,7 @@ impl KeySpace { /// pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning { // Assume that each value is 8k in size. - let target_nblocks = (target_size / BLCKSZ as u64) as usize; + let target_nblocks = (target_size / BLCKSZ as u64) as u32; let mut parts = Vec::new(); let mut current_part = Vec::new(); @@ -198,7 +245,8 @@ impl KeySpace { // partition would cause it to be too large, and our current partition // covers at least one block that is physically present in this shard, // then start a new partition - if current_part_size + range_size as usize > target_nblocks && current_part_size > 0 + if current_part_size + range_size as usize > target_nblocks as usize + && current_part_size > 0 { parts.push(KeySpace { ranges: current_part, @@ -934,4 +982,165 @@ mod tests { } } } + + /// Test helper: construct a ShardedRange and call fragment() on it, returning + /// the total page count in the range and the fragments. + fn do_fragment( + range_start: Key, + range_end: Key, + shard_identity: &ShardIdentity, + target_nblocks: u32, + ) -> (u32, Vec<(u32, Range)>) { + let range = ShardedRange::new( + Range { + start: range_start, + end: range_end, + }, + shard_identity, + ); + + let page_count = range.page_count(); + let fragments = range.fragment(target_nblocks); + + // Invariant: we always get at least one fragment + assert!(!fragments.is_empty()); + + if page_count > 0 { + // Invariant: every fragment must contain at least one shard-local page, if the + // total range contains at least one shard-local page + let all_nonzero = fragments.iter().all(|f| f.0 > 0); + if !all_nonzero { + eprintln!("Found a zero-length fragment: {:?}", fragments); + } + assert!(all_nonzero); + } else { + // A range with no shard-local pages should always be returned as a single fragment + assert_eq!(fragments, vec![(0, range_start..range_end)]); + } + + (page_count, fragments) + } + + /// Really simple tests for fragment(), on a range that just contains a single stripe + /// for a single tenant. + #[test] + fn sharded_range_fragment_simple() { + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + // A range which we happen to know covers exactly one stripe which belongs to this shard + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap(); + + // Ask for stripe_size blocks, we get the whole stripe + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 32768), + (32768, vec![(32768, input_start..input_end)]) + ); + + // Ask for more, we still get the whole stripe + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 10000000), + (32768, vec![(32768, input_start..input_end)]) + ); + + // Ask for target_nblocks of half the stripe size, we get two halves + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 16384), + ( + 32768, + vec![ + (16384, input_start..input_start.add(16384)), + (16384, input_start.add(16384)..input_end) + ] + ) + ); + } + + #[test] + fn sharded_range_fragment_multi_stripe() { + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + // A range which covers multiple stripes, exactly one of which belongs to the current shard. + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap(); + // Ask for all the blocks, get a fragment that covers the whole range but reports + // its size to be just the blocks belonging to our shard. + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 131072), + (32768, vec![(32768, input_start..input_end)]) + ); + + // Ask for a sub-stripe quantity + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 16000), + ( + 32768, + vec![ + (16000, input_start..input_start.add(16000)), + (16000, input_start.add(16000)..input_start.add(32000)), + (768, input_start.add(32000)..input_end), + ] + ) + ); + + // Try on a range that starts slightly after our owned stripe + assert_eq!( + do_fragment(input_start.add(1), input_end, &shard_identity, 131072), + (32767, vec![(32767, input_start.add(1)..input_end)]) + ); + } + + /// Test that ShardedRange behaves properly when used on un-sharded data + #[test] + fn sharded_range_fragment_unsharded() { + let shard_identity = ShardIdentity::unsharded(); + + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x8000), + ( + 0x10000, + vec![ + (0x8000, input_start..input_start.add(0x8000)), + (0x8000, input_start.add(0x8000)..input_start.add(0x10000)) + ] + ) + ); + } + + #[test] + fn sharded_range_fragment_cross_relation() { + let shard_identity = ShardIdentity::unsharded(); + + // A range that spans relations: expect fragmentation to give up and return a u32::MAX size + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x8000), + (u32::MAX, vec![(u32::MAX, input_start..input_end),]) + ); + + // Same, but using a sharded identity + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x8000), + (u32::MAX, vec![(u32::MAX, input_start..input_end),]) + ); + } }