mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
pageserver: shard-aware keyspace partitioning
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::ops::Range;
|
||||
|
||||
use crate::key::Key;
|
||||
use crate::{
|
||||
key::Key,
|
||||
shard::{ShardCount, ShardIdentity},
|
||||
};
|
||||
use itertools::Itertools;
|
||||
|
||||
///
|
||||
@@ -14,12 +17,144 @@ pub struct KeySpace {
|
||||
pub ranges: Vec<Range<Key>>,
|
||||
}
|
||||
|
||||
/// Represents a contiguous half-open range of the keyspace, masked according to a particular
|
||||
/// ShardNumber's stripes: within this range of keys, only some "belong" to the current
|
||||
/// shard.
|
||||
///
|
||||
/// When we iterate over keys within this object, we will skip any keys that don't belong
|
||||
/// to this shard.
|
||||
///
|
||||
/// The start + end keys may not belong to the shard: these specify where layer files should
|
||||
/// start + end, but we will never actually read/write those keys.
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub(crate) struct ShardedRange<'a> {
|
||||
pub(crate) shard_identity: &'a ShardIdentity,
|
||||
pub(crate) range: Range<Key>,
|
||||
}
|
||||
|
||||
impl<'a> ShardedRange<'a> {
|
||||
pub fn new(range: Range<Key>, shard_identity: &'a ShardIdentity) -> Self {
|
||||
Self {
|
||||
shard_identity,
|
||||
range,
|
||||
}
|
||||
}
|
||||
|
||||
/// Break up this range into chunks, each of which has at least one local key in it.
|
||||
pub fn fragment(self, target_nblocks: usize) -> Vec<(u32, Range<Key>)> {
|
||||
let shard_identity = self.shard_identity;
|
||||
let mut range = self;
|
||||
let mut result = Vec::new();
|
||||
loop {
|
||||
// Split off the first target_nblocks if the remainder of the range would still contain
|
||||
// some local blocks, otherwise yield the remainder of the range and we're done.
|
||||
let range_size = range.page_count();
|
||||
|
||||
if range_size == u32::MAX || range_size == 0 {
|
||||
return vec![(range_size, range.range)];
|
||||
}
|
||||
|
||||
if range_size > target_nblocks as u32 {
|
||||
// FIXME: this add is not advancing far enough to capture target_nblocks *local*
|
||||
// blocks. So we will end up chunking our range more finely than we needed to.
|
||||
let remainder = Self::new(
|
||||
range.range.start.add(target_nblocks as u32)..range.range.end,
|
||||
shard_identity,
|
||||
);
|
||||
|
||||
let remainder_blocks = remainder.page_count();
|
||||
if remainder_blocks > 0 {
|
||||
// We may split the range here
|
||||
let mut split_off = range;
|
||||
split_off.range.end = remainder.range.start;
|
||||
|
||||
result.push((split_off.page_count(), split_off.range));
|
||||
|
||||
range = remainder;
|
||||
} else {
|
||||
// We may not split because the remainder would contain no local blocks
|
||||
result.push((range_size, range.range));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
result.push((range_size, range.range));
|
||||
break;
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Estimate the physical pages that are within this range, on this shard. This returns
|
||||
/// u32::MAX if the range spans relations: this return value should be interpreted as "large".
|
||||
pub fn page_count(&self) -> u32 {
|
||||
let start = self.range.start;
|
||||
let end = self.range.end;
|
||||
// Although only field4 & field6 are included in the hash, if other fields differ then
|
||||
// it would be inaccurate for us to return a block count that assumed they were the same.
|
||||
if end.field1 != start.field1
|
||||
|| end.field2 != start.field2
|
||||
|| end.field3 != start.field3
|
||||
|| end.field4 != start.field4
|
||||
{
|
||||
return u32::MAX;
|
||||
}
|
||||
|
||||
// Fast path: avoid hashing if we can immediately identify the owner of the whole range
|
||||
if self.shard_identity.count < ShardCount::new(2) {
|
||||
let start = (start.field5 as u64) << 32 | start.field6 as u64;
|
||||
let end = (end.field5 as u64) << 32 | end.field6 as u64;
|
||||
|
||||
let diff = end - start;
|
||||
if diff > u32::MAX as u64 {
|
||||
return u32::MAX;
|
||||
} else {
|
||||
return diff as u32;
|
||||
}
|
||||
}
|
||||
|
||||
// Special cases for single keys like logical sizes
|
||||
if end == start.add(1) && self.shard_identity.is_key_local(&start) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
|
||||
// to Self, and add the stripe's block count to our total if so.
|
||||
let mut result: u64 = 0;
|
||||
let mut stripe_start = start;
|
||||
while stripe_start < end {
|
||||
let is_key_disposable = self.shard_identity.is_key_disposable(&stripe_start);
|
||||
|
||||
// Count up to the next stripe_size boundary
|
||||
let stripe_index = stripe_start.field6 / self.shard_identity.stripe_size.0;
|
||||
let stripe_remainder = self.shard_identity.stripe_size.0
|
||||
- (stripe_start.field6 - stripe_index * self.shard_identity.stripe_size.0);
|
||||
|
||||
let next_stripe_start = stripe_start.add(stripe_remainder);
|
||||
let stripe_end = std::cmp::min(end, next_stripe_start);
|
||||
|
||||
// If this blocks in this stripe belong to us, add them to our count
|
||||
if !is_key_disposable {
|
||||
let start = (stripe_start.field5 as u64) << 32 | stripe_start.field6 as u64;
|
||||
let end = (stripe_end.field5 as u64) << 32 | stripe_end.field6 as u64;
|
||||
result += end - start;
|
||||
}
|
||||
|
||||
stripe_start = next_stripe_start;
|
||||
}
|
||||
if result > u32::MAX as u64 {
|
||||
u32::MAX
|
||||
} else {
|
||||
result as u32
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl KeySpace {
|
||||
///
|
||||
/// Partition a key space into roughly chunks of roughly 'target_size' bytes
|
||||
/// in each partition.
|
||||
///
|
||||
pub fn partition(&self, target_size: u64) -> KeyPartitioning {
|
||||
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
|
||||
// Assume that each value is 8k in size.
|
||||
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
|
||||
|
||||
@@ -27,31 +162,27 @@ impl KeySpace {
|
||||
let mut current_part = Vec::new();
|
||||
let mut current_part_size: usize = 0;
|
||||
for range in &self.ranges {
|
||||
// If appending the next contiguous range in the keyspace to the current
|
||||
// partition would cause it to be too large, start a new partition.
|
||||
let this_size = key_range_size(range) as usize;
|
||||
if current_part_size + this_size > target_nblocks && !current_part.is_empty() {
|
||||
parts.push(KeySpace {
|
||||
ranges: current_part,
|
||||
});
|
||||
current_part = Vec::new();
|
||||
current_part_size = 0;
|
||||
}
|
||||
// While doing partitioning, wrap the range in ShardedRange so that our size calculations
|
||||
// will respect shard striping rather than assuming all keys within a range are present.
|
||||
let range = ShardedRange::new(range.clone(), shard_identity);
|
||||
|
||||
// If the next range is larger than 'target_size', split it into
|
||||
// 'target_size' chunks.
|
||||
let mut remain_size = this_size;
|
||||
let mut start = range.start;
|
||||
while remain_size > target_nblocks {
|
||||
let next = start.add(target_nblocks as u32);
|
||||
parts.push(KeySpace {
|
||||
ranges: vec![start..next],
|
||||
});
|
||||
start = next;
|
||||
remain_size -= target_nblocks
|
||||
// Chunk up the range into parts that each contain up to target_size local blocks
|
||||
for (range_size, range) in range.fragment(target_nblocks) {
|
||||
// If appending the next contiguous range in the keyspace to the current
|
||||
// partition would cause it to be too large, and our current partition
|
||||
// covers at least one block that is physically present in this shard,
|
||||
// then start a new partition
|
||||
if current_part_size + range_size as usize > target_nblocks && current_part_size > 0
|
||||
{
|
||||
parts.push(KeySpace {
|
||||
ranges: current_part,
|
||||
});
|
||||
current_part = Vec::new();
|
||||
current_part_size = 0;
|
||||
}
|
||||
current_part.push(range.start..range.end);
|
||||
current_part_size += range_size as usize;
|
||||
}
|
||||
current_part.push(start..range.end);
|
||||
current_part_size += remain_size;
|
||||
}
|
||||
|
||||
// add last partition that wasn't full yet.
|
||||
@@ -354,6 +485,11 @@ pub fn singleton_range(key: Key) -> Range<Key> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
models::ShardParameters,
|
||||
shard::{ShardCount, ShardNumber},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use std::fmt::Write;
|
||||
|
||||
@@ -700,4 +836,95 @@ mod tests {
|
||||
]
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn sharded_range_relation_gap() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(0),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067F00000005000040100300000000").unwrap(),
|
||||
end: Key::from_hex("000000067F00000005000040130000004000").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
|
||||
// Key range spans relations, expect MAX
|
||||
assert_eq!(range.page_count(), u32::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_keyspaces_single_key() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(1),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067f000000010000007000ffffffff").unwrap(),
|
||||
end: Key::from_hex("000000067f00000001000000700100000000").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
// Single-key range on logical size key
|
||||
assert_eq!(range.page_count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_keyspaces_forkno_gap() {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(1),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067f00000001000004df00fffffffe").unwrap(),
|
||||
end: Key::from_hex("000000067f00000001000004df0100000003").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
|
||||
// Range spanning the end of one forkno and the start of the next, but not intersecting this shard's stripes
|
||||
// This is technically an under-count, as the logical size key would be stored on this shard, but that's okay
|
||||
// because page_count is allowed to under-count: it just mustn't over-count.
|
||||
assert_eq!(range.page_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_identity_keyspaces_one_relation() {
|
||||
for shard_number in 0..4 {
|
||||
let shard_identity = ShardIdentity::new(
|
||||
ShardNumber(shard_number),
|
||||
ShardCount::new(4),
|
||||
ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let range = ShardedRange::new(
|
||||
Range {
|
||||
start: Key::from_hex("000000067f00000001000000ae0000000000").unwrap(),
|
||||
end: Key::from_hex("000000067f00000001000000ae0000000001").unwrap(),
|
||||
},
|
||||
&shard_identity,
|
||||
);
|
||||
|
||||
// Very simple case: range covering block zero of one relation, where that block maps to shard zero
|
||||
if shard_number == 0 {
|
||||
assert_eq!(range.page_count(), 1);
|
||||
} else {
|
||||
// Other shards should perceive the range's size as zero
|
||||
assert_eq!(range.page_count(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -263,7 +263,10 @@ where
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
.partition(
|
||||
self.timeline.get_shard_identity(),
|
||||
Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
|
||||
);
|
||||
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
|
||||
@@ -3897,7 +3897,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
||||
let partitioning = keyspace.partition(partition_size);
|
||||
let partitioning = keyspace.partition(&self.shard_identity, partition_size);
|
||||
|
||||
*partitioning_guard = (partitioning, lsn);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user