diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index eed4835f25..4283da18ab 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -1,7 +1,10 @@ use postgres_ffi::BLCKSZ; use std::ops::Range; -use crate::key::Key; +use crate::{ + key::Key, + shard::{ShardCount, ShardIdentity}, +}; use itertools::Itertools; /// @@ -14,6 +17,234 @@ pub struct KeySpace { pub ranges: Vec>, } +/// Represents a contiguous half-open range of the keyspace, masked according to a particular +/// ShardNumber's stripes: within this range of keys, only some "belong" to the current +/// shard. +/// +/// When we iterate over keys within this object, we will skip any keys that don't belong +/// to this shard. +/// +/// The start + end keys may not belong to the shard: these specify where layer files should +/// start + end, but we will never actually read/write those keys. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardedRange<'a> { + pub shard_identity: &'a ShardIdentity, + pub range: Range, +} + +// Calculate the size of a range within the blocks of the same relation, or spanning only the +// top page in the previous relation's space. +fn contiguous_range_len(range: &Range) -> u32 { + debug_assert!(is_contiguous_range(range)); + if range.start.field6 == 0xffffffff { + range.end.field6 + 1 + } else { + range.end.field6 - range.start.field6 + } +} + +/// Return true if this key range includes only keys in the same relation's data blocks, or +/// just spanning one relation and the logical size (0xffffffff) block of the relation before it. +/// +/// Contiguous in this context means we know the keys are in use _somewhere_, but it might not +/// be on our shard. Later in ShardedRange we do the extra work to figure out how much +/// of a given contiguous range is present on one shard. +/// +/// This matters, because: +/// - Within such ranges, keys are used contiguously. Outside such ranges it is sparse. +/// - Within such ranges, we may calculate distances using simple subtraction of field6. +fn is_contiguous_range(range: &Range) -> bool { + range.start.field1 == range.end.field1 + && range.start.field2 == range.end.field2 + && range.start.field3 == range.end.field3 + && range.start.field4 == range.end.field4 + && (range.start.field5 == range.end.field5 + || (range.start.field6 == 0xffffffff && range.start.field5 + 1 == range.end.field5)) +} + +impl<'a> ShardedRange<'a> { + pub fn new(range: Range, shard_identity: &'a ShardIdentity) -> Self { + Self { + shard_identity, + range, + } + } + + /// Break up this range into chunks, each of which has at least one local key in it if the + /// total range has at least one local key. + pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range)> { + // Optimization for single-key case (e.g. logical size keys) + if self.range.end == self.range.start.add(1) { + return vec![( + if self.shard_identity.is_key_disposable(&self.range.start) { + 0 + } else { + 1 + }, + self.range, + )]; + } + + if !is_contiguous_range(&self.range) { + // Ranges that span relations are not fragmented. We only get these ranges as a result + // of operations that act on existing layers, so we trust that the existing range is + // reasonably small. + return vec![(u32::MAX, self.range)]; + } + + let mut fragments: Vec<(u32, Range)> = Vec::new(); + + let mut cursor = self.range.start; + while cursor < self.range.end { + let advance_by = self.distance_to_next_boundary(cursor); + let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor); + + // If the previous fragment is undersized, then we seek to consume enough + // blocks to complete it. + let (want_blocks, merge_last_fragment) = match fragments.last_mut() { + Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)), + Some(frag) => { + // Prev block is complete, want the full number. + ( + target_nblocks, + if is_fragment_disposable { + // If this current range will be empty (not shard-local data), we will merge into previous + Some(frag) + } else { + None + }, + ) + } + None => { + // First iteration, want the full number + (target_nblocks, None) + } + }; + + let advance_by = if is_fragment_disposable { + advance_by + } else { + std::cmp::min(advance_by, want_blocks) + }; + + let next_cursor = cursor.add(advance_by); + + let this_frag = ( + if is_fragment_disposable { + 0 + } else { + advance_by + }, + cursor..next_cursor, + ); + cursor = next_cursor; + + if let Some(last_fragment) = merge_last_fragment { + // Previous fragment was short or this one is empty, merge into it + last_fragment.0 += this_frag.0; + last_fragment.1.end = this_frag.1.end; + } else { + fragments.push(this_frag); + } + } + + fragments + } + + /// Estimate the physical pages that are within this range, on this shard. This returns + /// u32::MAX if the range spans relations: this return value should be interpreted as "large". + pub fn page_count(&self) -> u32 { + // Special cases for single keys like logical sizes + if self.range.end == self.range.start.add(1) { + return if self.shard_identity.is_key_disposable(&self.range.start) { + 0 + } else { + 1 + }; + } + + // We can only do an authentic calculation of contiguous key ranges + if !is_contiguous_range(&self.range) { + return u32::MAX; + } + + // Special case for single sharded tenants: our logical and physical sizes are the same + if self.shard_identity.count < ShardCount::new(2) { + return contiguous_range_len(&self.range); + } + + // Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs + // to Self, and add the stripe's block count to our total if so. + let mut result: u64 = 0; + let mut cursor = self.range.start; + while cursor < self.range.end { + // Count up to the next stripe_size boundary or end of range + let advance_by = self.distance_to_next_boundary(cursor); + + // If this blocks in this stripe belong to us, add them to our count + if !self.shard_identity.is_key_disposable(&cursor) { + result += advance_by as u64; + } + + cursor = cursor.add(advance_by); + } + + if result > u32::MAX as u64 { + u32::MAX + } else { + result as u32 + } + } + + /// Advance the cursor to the next potential fragment boundary: this is either + /// a stripe boundary, or the end of the range. + fn distance_to_next_boundary(&self, cursor: Key) -> u32 { + let distance_to_range_end = contiguous_range_len(&(cursor..self.range.end)); + + if self.shard_identity.count < ShardCount::new(2) { + // Optimization: don't bother stepping through stripes if the tenant isn't sharded. + return distance_to_range_end; + } + + if cursor.field6 == 0xffffffff { + // We are wrapping from one relation's logical size to the next relation's first data block + return 1; + } + + let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0; + let stripe_remainder = self.shard_identity.stripe_size.0 + - (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0); + + if cfg!(debug_assertions) { + // We should never overflow field5 and field6 -- our callers check this earlier + // and would have returned their u32::MAX cases if the input range violated this. + let next_cursor = cursor.add(stripe_remainder); + debug_assert!( + next_cursor.field1 == cursor.field1 + && next_cursor.field2 == cursor.field2 + && next_cursor.field3 == cursor.field3 + && next_cursor.field4 == cursor.field4 + && next_cursor.field5 == cursor.field5 + ) + } + + std::cmp::min(stripe_remainder, distance_to_range_end) + } + + /// Whereas `page_count` estimates the number of pages physically in this range on this shard, + /// this function simply calculates the number of pages in the space, without accounting for those + /// pages that would not actually be stored on this node. + /// + /// Don't use this function in code that works with physical entities like layer files. + fn raw_size(range: &Range) -> u32 { + if is_contiguous_range(range) { + contiguous_range_len(range) + } else { + u32::MAX + } + } +} + impl KeySpace { /// Create a key space with a single range. pub fn single(key_range: Range) -> Self { @@ -25,39 +256,36 @@ impl KeySpace { /// Partition a key space into roughly chunks of roughly 'target_size' bytes /// in each partition. /// - pub fn partition(&self, target_size: u64) -> KeyPartitioning { + pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning { // Assume that each value is 8k in size. - let target_nblocks = (target_size / BLCKSZ as u64) as usize; + let target_nblocks = (target_size / BLCKSZ as u64) as u32; let mut parts = Vec::new(); let mut current_part = Vec::new(); let mut current_part_size: usize = 0; for range in &self.ranges { - // If appending the next contiguous range in the keyspace to the current - // partition would cause it to be too large, start a new partition. - let this_size = key_range_size(range) as usize; - if current_part_size + this_size > target_nblocks && !current_part.is_empty() { - parts.push(KeySpace { - ranges: current_part, - }); - current_part = Vec::new(); - current_part_size = 0; - } + // While doing partitioning, wrap the range in ShardedRange so that our size calculations + // will respect shard striping rather than assuming all keys within a range are present. + let range = ShardedRange::new(range.clone(), shard_identity); - // If the next range is larger than 'target_size', split it into - // 'target_size' chunks. - let mut remain_size = this_size; - let mut start = range.start; - while remain_size > target_nblocks { - let next = start.add(target_nblocks as u32); - parts.push(KeySpace { - ranges: vec![start..next], - }); - start = next; - remain_size -= target_nblocks + // Chunk up the range into parts that each contain up to target_size local blocks + for (frag_on_shard_size, frag_range) in range.fragment(target_nblocks) { + // If appending the next contiguous range in the keyspace to the current + // partition would cause it to be too large, and our current partition + // covers at least one block that is physically present in this shard, + // then start a new partition + if current_part_size + frag_on_shard_size as usize > target_nblocks as usize + && current_part_size > 0 + { + parts.push(KeySpace { + ranges: current_part, + }); + current_part = Vec::new(); + current_part_size = 0; + } + current_part.push(frag_range.start..frag_range.end); + current_part_size += frag_on_shard_size as usize; } - current_part.push(start..range.end); - current_part_size += remain_size; } // add last partition that wasn't full yet. @@ -71,7 +299,7 @@ impl KeySpace { } pub fn is_empty(&self) -> bool { - self.total_size() == 0 + self.total_raw_size() == 0 } /// Merge another keyspace into the current one. @@ -164,11 +392,11 @@ impl KeySpace { self.ranges.last().map(|range| range.end) } - #[allow(unused)] - pub fn total_size(&self) -> usize { + /// The size of the keyspace in pages, before accounting for sharding + pub fn total_raw_size(&self) -> usize { self.ranges .iter() - .map(|range| key_range_size(range) as usize) + .map(|range| ShardedRange::raw_size(range) as usize) .sum() } @@ -242,7 +470,7 @@ impl KeySpaceAccum { #[inline(always)] pub fn add_range(&mut self, range: Range) { - self.size += key_range_size(&range) as u64; + self.size += ShardedRange::raw_size(&range) as u64; match self.accum.as_mut() { Some(accum) => { @@ -274,7 +502,9 @@ impl KeySpaceAccum { std::mem::take(self).to_keyspace() } - pub fn size(&self) -> u64 { + // The total number of keys in this object, ignoring any sharding effects that might cause some of + // the keys to be omitted in storage on this shard. + pub fn raw_size(&self) -> u64 { self.size } } @@ -330,36 +560,19 @@ impl KeySpaceRandomAccum { } } -#[inline(always)] -pub fn key_range_size(key_range: &Range) -> u32 { - let start = key_range.start; - let end = key_range.end; - - if end.field1 != start.field1 - || end.field2 != start.field2 - || end.field3 != start.field3 - || end.field4 != start.field4 - { - return u32::MAX; - } - - let start = (start.field5 as u64) << 32 | start.field6 as u64; - let end = (end.field5 as u64) << 32 | end.field6 as u64; - - let diff = end - start; - if diff > u32::MAX as u64 { - u32::MAX - } else { - diff as u32 - } -} - pub fn singleton_range(key: Key) -> Range { key..key.next() } #[cfg(test)] mod tests { + use rand::{RngCore, SeedableRng}; + + use crate::{ + models::ShardParameters, + shard::{ShardCount, ShardNumber}, + }; + use super::*; use std::fmt::Write; @@ -402,14 +615,17 @@ mod tests { accum.add_range(range.clone()); } - let expected_size: u64 = ranges.iter().map(|r| key_range_size(r) as u64).sum(); - assert_eq!(accum.size(), expected_size); + let expected_size: u64 = ranges + .iter() + .map(|r| ShardedRange::raw_size(r) as u64) + .sum(); + assert_eq!(accum.raw_size(), expected_size); assert_ks_eq(&accum.consume_keyspace(), ranges.clone()); - assert_eq!(accum.size(), 0); + assert_eq!(accum.raw_size(), 0); assert_ks_eq(&accum.consume_keyspace(), vec![]); - assert_eq!(accum.size(), 0); + assert_eq!(accum.raw_size(), 0); for range in &ranges { accum.add_range(range.clone()); @@ -706,4 +922,412 @@ mod tests { ] ); } + #[test] + fn sharded_range_relation_gap() { + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + let range = ShardedRange::new( + Range { + start: Key::from_hex("000000067F00000005000040100300000000").unwrap(), + end: Key::from_hex("000000067F00000005000040130000004000").unwrap(), + }, + &shard_identity, + ); + + // Key range spans relations, expect MAX + assert_eq!(range.page_count(), u32::MAX); + } + + #[test] + fn shard_identity_keyspaces_single_key() { + let shard_identity = ShardIdentity::new( + ShardNumber(1), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + let range = ShardedRange::new( + Range { + start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(), + end: Key::from_hex("000000067f00000001000000700100000000").unwrap(), + }, + &shard_identity, + ); + // Single-key range on logical size key + assert_eq!(range.page_count(), 1); + } + + /// Test the helper that we use to identify ranges which go outside the data blocks of a single relation + #[test] + fn contiguous_range_check() { + assert!(!is_contiguous_range( + &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap() + ..Key::from_hex("000000067f00000001000004df0100000003").unwrap()) + ),); + + // The ranges goes all the way up to the 0xffffffff, including it: this is + // not considered a rel block range because 0xffffffff stores logical sizes, + // not blocks. + assert!(!is_contiguous_range( + &(Key::from_hex("000000067f00000001000004df00fffffffe").unwrap() + ..Key::from_hex("000000067f00000001000004df0100000000").unwrap()) + ),); + + // Keys within the normal data region of a relation + assert!(is_contiguous_range( + &(Key::from_hex("000000067f00000001000004df0000000000").unwrap() + ..Key::from_hex("000000067f00000001000004df0000000080").unwrap()) + ),); + + // The logical size key of one forkno, then some blocks in the next + assert!(is_contiguous_range( + &(Key::from_hex("000000067f00000001000004df00ffffffff").unwrap() + ..Key::from_hex("000000067f00000001000004df0100000080").unwrap()) + ),); + } + + #[test] + fn shard_identity_keyspaces_forkno_gap() { + let shard_identity = ShardIdentity::new( + ShardNumber(1), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + let range = ShardedRange::new( + Range { + start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(), + end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(), + }, + &shard_identity, + ); + + // Range spanning the end of one forkno and the start of the next: we do not attempt to + // calculate a valid size, because we have no way to know if they keys between start + // and end are actually in use. + assert_eq!(range.page_count(), u32::MAX); + } + + #[test] + fn shard_identity_keyspaces_one_relation() { + for shard_number in 0..4 { + let shard_identity = ShardIdentity::new( + ShardNumber(shard_number), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + let range = ShardedRange::new( + Range { + start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(), + end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(), + }, + &shard_identity, + ); + + // Very simple case: range covering block zero of one relation, where that block maps to shard zero + if shard_number == 0 { + assert_eq!(range.page_count(), 1); + } else { + // Other shards should perceive the range's size as zero + assert_eq!(range.page_count(), 0); + } + } + } + + /// Test helper: construct a ShardedRange and call fragment() on it, returning + /// the total page count in the range and the fragments. + fn do_fragment( + range_start: Key, + range_end: Key, + shard_identity: &ShardIdentity, + target_nblocks: u32, + ) -> (u32, Vec<(u32, Range)>) { + let range = ShardedRange::new( + Range { + start: range_start, + end: range_end, + }, + shard_identity, + ); + + let page_count = range.page_count(); + let fragments = range.fragment(target_nblocks); + + // Invariant: we always get at least one fragment + assert!(!fragments.is_empty()); + + // Invariant: the first/last fragment start/end should equal the input start/end + assert_eq!(fragments.first().unwrap().1.start, range_start); + assert_eq!(fragments.last().unwrap().1.end, range_end); + + if page_count > 0 { + // Invariant: every fragment must contain at least one shard-local page, if the + // total range contains at least one shard-local page + let all_nonzero = fragments.iter().all(|f| f.0 > 0); + if !all_nonzero { + eprintln!("Found a zero-length fragment: {:?}", fragments); + } + assert!(all_nonzero); + } else { + // A range with no shard-local pages should always be returned as a single fragment + assert_eq!(fragments, vec![(0, range_start..range_end)]); + } + + // Invariant: fragments must be ordered and non-overlapping + let mut last: Option> = None; + for frag in &fragments { + if let Some(last) = last { + assert!(frag.1.start >= last.end); + assert!(frag.1.start > last.start); + } + last = Some(frag.1.clone()) + } + + // Invariant: fragments respect target_nblocks + for frag in &fragments { + assert!(frag.0 == u32::MAX || frag.0 <= target_nblocks); + } + + (page_count, fragments) + } + + /// Really simple tests for fragment(), on a range that just contains a single stripe + /// for a single tenant. + #[test] + fn sharded_range_fragment_simple() { + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + // A range which we happen to know covers exactly one stripe which belongs to this shard + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap(); + + // Ask for stripe_size blocks, we get the whole stripe + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 32768), + (32768, vec![(32768, input_start..input_end)]) + ); + + // Ask for more, we still get the whole stripe + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 10000000), + (32768, vec![(32768, input_start..input_end)]) + ); + + // Ask for target_nblocks of half the stripe size, we get two halves + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 16384), + ( + 32768, + vec![ + (16384, input_start..input_start.add(16384)), + (16384, input_start.add(16384)..input_end) + ] + ) + ); + } + + #[test] + fn sharded_range_fragment_multi_stripe() { + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + + // A range which covers multiple stripes, exactly one of which belongs to the current shard. + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap(); + // Ask for all the blocks, get a fragment that covers the whole range but reports + // its size to be just the blocks belonging to our shard. + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 131072), + (32768, vec![(32768, input_start..input_end)]) + ); + + // Ask for a sub-stripe quantity + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 16000), + ( + 32768, + vec![ + (16000, input_start..input_start.add(16000)), + (16000, input_start.add(16000)..input_start.add(32000)), + (768, input_start.add(32000)..input_end), + ] + ) + ); + + // Try on a range that starts slightly after our owned stripe + assert_eq!( + do_fragment(input_start.add(1), input_end, &shard_identity, 131072), + (32767, vec![(32767, input_start.add(1)..input_end)]) + ); + } + + /// Test our calculations work correctly when we start a range from the logical size key of + /// a previous relation. + #[test] + fn sharded_range_fragment_starting_from_logical_size() { + let input_start = Key::from_hex("000000067f00000001000000ae00ffffffff").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0100008000").unwrap(); + + // Shard 0 owns the first stripe in the relation, and the preceding logical size is shard local too + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x10000), + (0x8001, vec![(0x8001, input_start..input_end)]) + ); + + // Shard 1 does not own the first stripe in the relation, but it does own the logical size (all shards + // store all logical sizes) + let shard_identity = ShardIdentity::new( + ShardNumber(1), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x10000), + (0x1, vec![(0x1, input_start..input_end)]) + ); + } + + /// Test that ShardedRange behaves properly when used on un-sharded data + #[test] + fn sharded_range_fragment_unsharded() { + let shard_identity = ShardIdentity::unsharded(); + + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x8000), + ( + 0x10000, + vec![ + (0x8000, input_start..input_start.add(0x8000)), + (0x8000, input_start.add(0x8000)..input_start.add(0x10000)) + ] + ) + ); + } + + #[test] + fn sharded_range_fragment_cross_relation() { + let shard_identity = ShardIdentity::unsharded(); + + // A range that spans relations: expect fragmentation to give up and return a u32::MAX size + let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap(); + let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x8000), + (u32::MAX, vec![(u32::MAX, input_start..input_end),]) + ); + + // Same, but using a sharded identity + let shard_identity = ShardIdentity::new( + ShardNumber(0), + ShardCount::new(4), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 0x8000), + (u32::MAX, vec![(u32::MAX, input_start..input_end),]) + ); + } + + #[test] + fn sharded_range_fragment_tiny_nblocks() { + let shard_identity = ShardIdentity::unsharded(); + + // A range that spans relations: expect fragmentation to give up and return a u32::MAX size + let input_start = Key::from_hex("000000067F00000001000004E10000000000").unwrap(); + let input_end = Key::from_hex("000000067F00000001000004E10000000038").unwrap(); + assert_eq!( + do_fragment(input_start, input_end, &shard_identity, 16), + ( + 0x38, + vec![ + (16, input_start..input_start.add(16)), + (16, input_start.add(16)..input_start.add(32)), + (16, input_start.add(32)..input_start.add(48)), + (8, input_start.add(48)..input_end), + ] + ) + ); + } + + #[test] + fn sharded_range_fragment_fuzz() { + // Use a fixed seed: we don't want to explicitly pick values, but we do want + // the test to be reproducible. + let mut prng = rand::rngs::StdRng::seed_from_u64(0xdeadbeef); + + for _i in 0..1000 { + let shard_identity = if prng.next_u32() % 2 == 0 { + ShardIdentity::unsharded() + } else { + let shard_count = prng.next_u32() % 127 + 1; + ShardIdentity::new( + ShardNumber((prng.next_u32() % shard_count) as u8), + ShardCount::new(shard_count as u8), + ShardParameters::DEFAULT_STRIPE_SIZE, + ) + .unwrap() + }; + + let target_nblocks = prng.next_u32() % 65536 + 1; + + let start_offset = prng.next_u32() % 16384; + + // Try ranges up to 4GiB in size, that are always at least 1 + let range_size = prng.next_u32() % 8192 + 1; + + // A range that spans relations: expect fragmentation to give up and return a u32::MAX size + let input_start = Key::from_hex("000000067F00000001000004E10000000000") + .unwrap() + .add(start_offset); + let input_end = input_start.add(range_size); + + // This test's main success conditions are the invariants baked into do_fragment + let (_total_size, fragments) = + do_fragment(input_start, input_end, &shard_identity, target_nblocks); + + // Pick a random key within the range and check it appears in the output + let example_key = input_start.add(prng.next_u32() % range_size); + + // Panic on unwrap if it isn't found + let example_key_frag = fragments + .iter() + .find(|f| f.1.contains(&example_key)) + .unwrap(); + + // Check that the fragment containing our random key has a nonzero size if + // that key is shard-local + let example_key_local = !shard_identity.is_key_disposable(&example_key); + if example_key_local { + assert!(example_key_frag.0 > 0); + } + } + } } diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 6a8a5cc8f3..2d7f6772b2 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -451,7 +451,7 @@ impl ShardIdentity { /// An identity with number=0 count=0 is a "none" identity, which represents legacy /// tenants. Modern single-shard tenants should not use this: they should /// have number=0 count=1. - pub fn unsharded() -> Self { + pub const fn unsharded() -> Self { Self { number: ShardNumber(0), count: ShardCount(0), diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 5261746b22..137b93055a 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -18,6 +18,7 @@ //! database size. For example, if the logical database size is 10 GB, we would //! generate new image layers every 10 GB of WAL. use futures::StreamExt; +use pageserver_api::shard::ShardIdentity; use tracing::{debug, info}; use std::collections::{HashSet, VecDeque}; @@ -125,6 +126,7 @@ async fn compact_level( } let mut state = LevelCompactionState { + shard_identity: *executor.get_shard_identity(), target_file_size, _lsn_range: lsn_range.clone(), layers: layer_fragments, @@ -164,6 +166,8 @@ struct LevelCompactionState<'a, E> where E: CompactionJobExecutor, { + shard_identity: ShardIdentity, + // parameters target_file_size: u64, @@ -366,6 +370,7 @@ where .executor .get_keyspace(&job.key_range, job.lsn_range.end, ctx) .await?, + &self.shard_identity, ) * 8192; let wal_size = job @@ -430,7 +435,7 @@ where keyspace, self.target_file_size / 8192, ); - while let Some(key_range) = window.choose_next_image() { + while let Some(key_range) = window.choose_next_image(&self.shard_identity) { new_jobs.push(CompactionJob:: { key_range, lsn_range: job.lsn_range.clone(), @@ -623,7 +628,12 @@ impl KeyspaceWindowPos { } // Advance the cursor until it reaches 'target_keysize'. - fn advance_until_size(&mut self, w: &KeyspaceWindowHead, max_size: u64) { + fn advance_until_size( + &mut self, + w: &KeyspaceWindowHead, + max_size: u64, + shard_identity: &ShardIdentity, + ) { while self.accum_keysize < max_size && !self.reached_end(w) { let curr_range = &w.keyspace[self.keyspace_idx]; if self.end_key < curr_range.start { @@ -632,7 +642,7 @@ impl KeyspaceWindowPos { } // We're now within 'curr_range'. Can we advance past it completely? - let distance = K::key_range_size(&(self.end_key..curr_range.end)); + let distance = K::key_range_size(&(self.end_key..curr_range.end), shard_identity); if (self.accum_keysize + distance as u64) < max_size { // oh yeah, it fits self.end_key = curr_range.end; @@ -641,7 +651,7 @@ impl KeyspaceWindowPos { } else { // advance within the range let skip_key = self.end_key.skip_some(); - let distance = K::key_range_size(&(self.end_key..skip_key)); + let distance = K::key_range_size(&(self.end_key..skip_key), shard_identity); if (self.accum_keysize + distance as u64) < max_size { self.end_key = skip_key; self.accum_keysize += distance as u64; @@ -677,7 +687,7 @@ where } } - fn choose_next_image(&mut self) -> Option> { + fn choose_next_image(&mut self, shard_identity: &ShardIdentity) -> Option> { if self.start_pos.keyspace_idx == self.head.keyspace.len() { // we've reached the end return None; @@ -687,6 +697,7 @@ where next_pos.advance_until_size( &self.head, self.start_pos.accum_keysize + self.head.target_keysize, + shard_identity, ); // See if we can gobble up the rest of the keyspace if we stretch out the layer, up to @@ -695,6 +706,7 @@ where end_pos.advance_until_size( &self.head, self.start_pos.accum_keysize + (self.head.target_keysize * 5 / 4), + shard_identity, ); if end_pos.reached_end(&self.head) { // gobble up any unused keyspace between the last used key and end of the range diff --git a/pageserver/compaction/src/helpers.rs b/pageserver/compaction/src/helpers.rs index 9de6363d6e..1b80373ba7 100644 --- a/pageserver/compaction/src/helpers.rs +++ b/pageserver/compaction/src/helpers.rs @@ -5,6 +5,7 @@ use crate::interface::*; use futures::future::BoxFuture; use futures::{Stream, StreamExt}; use itertools::Itertools; +use pageserver_api::shard::ShardIdentity; use pin_project_lite::pin_project; use std::collections::BinaryHeap; use std::collections::VecDeque; @@ -13,11 +14,17 @@ use std::ops::{DerefMut, Range}; use std::pin::Pin; use std::task::{ready, Poll}; -pub fn keyspace_total_size(keyspace: &CompactionKeySpace) -> u64 +pub fn keyspace_total_size( + keyspace: &CompactionKeySpace, + shard_identity: &ShardIdentity, +) -> u64 where K: CompactionKey, { - keyspace.iter().map(|r| K::key_range_size(r) as u64).sum() + keyspace + .iter() + .map(|r| K::key_range_size(r, shard_identity) as u64) + .sum() } pub fn overlaps_with(a: &Range, b: &Range) -> bool { diff --git a/pageserver/compaction/src/interface.rs b/pageserver/compaction/src/interface.rs index 5dc62e506f..35519b5d0a 100644 --- a/pageserver/compaction/src/interface.rs +++ b/pageserver/compaction/src/interface.rs @@ -4,7 +4,7 @@ //! All the heavy lifting is done by the create_image and create_delta //! functions that the implementor provides. use futures::Future; -use pageserver_api::{key::Key, keyspace::key_range_size}; +use pageserver_api::{key::Key, keyspace::ShardedRange, shard::ShardIdentity}; use std::ops::Range; use utils::lsn::Lsn; @@ -32,6 +32,8 @@ pub trait CompactionJobExecutor { // Functions that the planner uses to support its decisions // ---- + fn get_shard_identity(&self) -> &ShardIdentity; + /// Return all layers that overlap the given bounding box. fn get_layers( &mut self, @@ -98,7 +100,7 @@ pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display { /// /// This returns u32, for compatibility with Repository::key. If the /// distance is larger, return u32::MAX. - fn key_range_size(key_range: &Range) -> u32; + fn key_range_size(key_range: &Range, shard_identity: &ShardIdentity) -> u32; // return "self + 1" fn next(&self) -> Self; @@ -113,8 +115,8 @@ impl CompactionKey for Key { const MIN: Self = Self::MIN; const MAX: Self = Self::MAX; - fn key_range_size(r: &std::ops::Range) -> u32 { - key_range_size(r) + fn key_range_size(r: &std::ops::Range, shard_identity: &ShardIdentity) -> u32 { + ShardedRange::new(r.clone(), shard_identity).page_count() } fn next(&self) -> Key { (self as &Key).next() diff --git a/pageserver/compaction/src/simulator.rs b/pageserver/compaction/src/simulator.rs index 6c00df3a65..3543df64fa 100644 --- a/pageserver/compaction/src/simulator.rs +++ b/pageserver/compaction/src/simulator.rs @@ -3,6 +3,7 @@ mod draw; use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp}; use futures::StreamExt; +use pageserver_api::shard::ShardIdentity; use rand::Rng; use tracing::info; @@ -71,7 +72,7 @@ impl interface::CompactionKey for Key { const MIN: Self = u64::MIN; const MAX: Self = u64::MAX; - fn key_range_size(key_range: &Range) -> u32 { + fn key_range_size(key_range: &Range, _shard_identity: &ShardIdentity) -> u32 { std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32 } @@ -434,6 +435,11 @@ impl interface::CompactionJobExecutor for MockTimeline { type ImageLayer = Arc; type RequestContext = MockRequestContext; + fn get_shard_identity(&self) -> &ShardIdentity { + static IDENTITY: ShardIdentity = ShardIdentity::unsharded(); + &IDENTITY + } + async fn get_layers( &mut self, key_range: &Range, diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index ba047745f1..8c51e93643 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -263,7 +263,10 @@ where .timeline .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) .await? - .partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64); + .partition( + self.timeline.get_shard_identity(), + Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + ); let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5fb5d231c7..1a85481e97 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -401,7 +401,7 @@ impl InMemoryLayer { } } - let keyspace_size = keyspace.total_size(); + let keyspace_size = keyspace.total_raw_size(); let mut completed_keys = HashSet::new(); while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 108acd3925..c5068386d6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -936,7 +936,7 @@ impl Timeline { return Err(GetVectoredError::InvalidLsn(lsn)); } - let key_count = keyspace.total_size().try_into().unwrap(); + let key_count = keyspace.total_raw_size().try_into().unwrap(); if key_count > Timeline::MAX_GET_VECTORED_KEYS { return Err(GetVectoredError::Oversized(key_count)); } @@ -1076,7 +1076,7 @@ impl Timeline { mut reconstruct_state: ValuesReconstructState, ctx: &RequestContext, ) -> Result>, GetVectoredError> { - let get_kind = if keyspace.total_size() == 1 { + let get_kind = if keyspace.total_raw_size() == 1 { GetKind::Singular } else { GetKind::Vectored @@ -3207,7 +3207,7 @@ impl Timeline { } } - if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() { + if keyspace.total_raw_size() == 0 || timeline.ancestor_timeline.is_none() { break; } @@ -3220,7 +3220,7 @@ impl Timeline { timeline = &*timeline_owned; } - if keyspace.total_size() != 0 { + if keyspace.total_raw_size() != 0 { return Err(GetVectoredError::MissingKey(keyspace.start().unwrap())); } @@ -3911,7 +3911,7 @@ impl Timeline { } let keyspace = self.collect_keyspace(lsn, ctx).await?; - let partitioning = keyspace.partition(partition_size); + let partitioning = keyspace.partition(&self.shard_identity, partition_size); *partitioning_guard = (partitioning, lsn); @@ -4064,7 +4064,7 @@ impl Timeline { key = key.next(); // Maybe flush `key_rest_accum` - if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS + if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS || last_key_in_range { let results = self diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8075775bbc..b92832a3de 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -15,7 +15,7 @@ use anyhow::{anyhow, Context}; use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; -use pageserver_api::shard::TenantShardId; +use pageserver_api::shard::{ShardIdentity, TenantShardId}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, trace, warn, Instrument}; use utils::id::TimelineId; @@ -831,6 +831,10 @@ impl CompactionJobExecutor for TimelineAdaptor { type RequestContext = crate::context::RequestContext; + fn get_shard_identity(&self) -> &ShardIdentity { + self.timeline.get_shard_identity() + } + async fn get_layers( &mut self, key_range: &Range, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 3902819d3d..43a3323462 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -1,4 +1,6 @@ +import json import os +from typing import Optional import pytest from fixtures.log_helper import log @@ -89,3 +91,102 @@ page_cache_size=10 # was chosen empirically for this workload. assert non_vectored_average < 8 assert vectored_average < 8 + + +# Stripe sizes in number of pages. +TINY_STRIPES = 16 +LARGE_STRIPES = 32768 + + +@pytest.mark.parametrize( + "shard_count,stripe_size", [(None, None), (4, TINY_STRIPES), (4, LARGE_STRIPES)] +) +def test_sharding_compaction( + neon_env_builder: NeonEnvBuilder, stripe_size: int, shard_count: Optional[int] +): + """ + Use small stripes, small layers, and small compaction thresholds to exercise how compaction + and image layer generation interacts with sharding. + + We are looking for bugs that might emerge from the way sharding uses sparse layer files that + only contain some of the keys in the key range covered by the layer, such as errors estimating + the size of layers that might result in too-small layer files. + """ + + compaction_target_size = 128 * 1024 + + TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{compaction_target_size}", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly: we want to exercise image layer creation in this test. + "image_creation_threshold": "1", + "image_layer_creation_check_threshold": 0, + } + + neon_env_builder.num_pageservers = 1 if shard_count is None else shard_count + env = neon_env_builder.init_start( + initial_tenant_conf=TENANT_CONF, + initial_tenant_shard_count=shard_count, + initial_tenant_shard_stripe_size=stripe_size, + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + workload = Workload(env, tenant_id, timeline_id) + workload.init() + workload.write_rows(64) + for _i in range(0, 10): + # Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1, + # these should result in image layers each time we write some data into a shard, and also shards + # recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer, + # rather than asserting) + workload.churn_rows(64) + + # Assert that we got some image layers: this is important because this test's purpose is to exercise the sharding changes + # to Timeline::create_image_layers, so if we weren't creating any image layers we wouldn't be doing our job. + shard_has_image_layers = [] + for shard in env.storage_controller.locate(tenant_id): + pageserver = env.get_pageserver(shard["node_id"]) + shard_id = shard["shard_id"] + layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id) + image_layer_sizes = {} + for layer in layer_map.historic_layers: + if layer.kind == "Image": + image_layer_sizes[layer.layer_file_name] = layer.layer_file_size + + # Pageserver should assert rather than emit an empty layer file, but double check here + assert layer.layer_file_size is not None + assert layer.layer_file_size > 0 + + shard_has_image_layers.append(len(image_layer_sizes) > 1) + log.info(f"Shard {shard_id} image layer sizes: {json.dumps(image_layer_sizes, indent=2)}") + + if stripe_size == TINY_STRIPES: + # Checking the average size validates that our keyspace partitioning is properly respecting sharding: if + # it was not, we would tend to get undersized layers because the partitioning would overestimate the physical + # data in a keyrange. + # + # We only do this check with tiny stripes, because large stripes may not give all shards enough + # data to have statistically significant image layers + avg_size = sum(v for v in image_layer_sizes.values()) / len(image_layer_sizes) # type: ignore + log.info(f"Shard {shard_id} average image layer size: {avg_size}") + assert avg_size > compaction_target_size / 2 + + if stripe_size == TINY_STRIPES: + # Expect writes were scattered across all pageservers: they should all have compacted some image layers + assert all(shard_has_image_layers) + else: + # With large stripes, it is expected that most of our writes went to one pageserver, so we just require + # that at least one of them has some image layers. + assert any(shard_has_image_layers) + + # Assert that everything is still readable + workload.validate()