Revised fragmentation logic

This commit is contained in:
John Spray
2024-04-23 17:05:00 +01:00
parent 1330a60d27
commit 9c097aa75f

View File

@@ -48,48 +48,89 @@ impl<'a> ShardedRange<'a> {
}
}
/// Break up this range into chunks, each of which has at least one local key in it.
pub fn fragment(self, target_nblocks: usize) -> Vec<(u32, Range<Key>)> {
let shard_identity = self.shard_identity;
let mut range = self;
let mut result = Vec::new();
loop {
// Split off the first target_nblocks if the remainder of the range would still contain
// some local blocks, otherwise yield the remainder of the range and we're done.
let range_size = range.page_count();
if range_size == u32::MAX || range_size == 0 {
return vec![(range_size, range.range)];
}
if range_size > target_nblocks as u32 {
// FIXME: this add is not advancing far enough to capture target_nblocks *local*
// blocks. So we will end up chunking our range more finely than we needed to.
let remainder = Self::new(
range.range.start.add(target_nblocks as u32)..range.range.end,
shard_identity,
);
let remainder_blocks = remainder.page_count();
if remainder_blocks > 0 {
// We may split the range here
let mut split_off = range;
split_off.range.end = remainder.range.start;
result.push((split_off.page_count(), split_off.range));
range = remainder;
/// Break up this range into chunks, each of which has at least one local key in it if the
/// total range has at least one local key.
pub fn fragment(self, target_nblocks: u32) -> Vec<(u32, Range<Key>)> {
// Optimization for single-key case (e.g. logical size keys)
if self.range.end == self.range.start.add(1) {
return vec![(
if self.shard_identity.is_key_disposable(&self.range.start) {
0
} else {
// We may not split because the remainder would contain no local blocks
result.push((range_size, range.range));
break;
1
},
self.range,
)];
}
if self.range.end.field1 != self.range.start.field1
|| self.range.end.field2 != self.range.start.field2
|| self.range.end.field3 != self.range.start.field3
|| self.range.end.field4 != self.range.start.field4
{
// Ranges that span relations are not fragmented. We only get these ranges as a result
// of operations that act on existing layers, so we trust that the existing range is
// reasonably small.
return vec![(u32::MAX, self.range)];
}
let mut fragments: Vec<(u32, Range<Key>)> = Vec::new();
let mut cursor = self.range.start;
while cursor < self.range.end {
let advance_by = self.advance_to_next_boundary(cursor);
let is_fragment_disposable = self.shard_identity.is_key_disposable(&cursor);
// If the previous fragment is undersized, then we seek to consume enough
// blocks to complete it.
let (want_blocks, merge_last_fragment) = match fragments.last_mut() {
Some(frag) if frag.0 < target_nblocks => (target_nblocks - frag.0, Some(frag)),
Some(frag) => {
// Prev block is complete, want the full number.
(
target_nblocks,
if is_fragment_disposable {
// If this current range will be empty (not shard-local data), we will merge into previous
Some(frag)
} else {
None
},
)
}
None => {
// First iteration, want the full number
(target_nblocks, None)
}
};
let advance_by = if is_fragment_disposable {
advance_by
} else {
result.push((range_size, range.range));
break;
std::cmp::min(advance_by, want_blocks)
};
let next_cursor = cursor.add(advance_by);
let this_frag = (
if is_fragment_disposable {
0
} else {
advance_by
},
cursor..next_cursor,
);
cursor = next_cursor;
if let Some(last_fragment) = merge_last_fragment {
// Previous fragment was short or this one is empty, merge into it
last_fragment.0 += this_frag.0;
last_fragment.1.end = this_frag.1.end;
} else {
fragments.push(this_frag);
}
}
result
fragments
}
/// Estimate the physical pages that are within this range, on this shard. This returns
@@ -115,27 +156,16 @@ impl<'a> ShardedRange<'a> {
// Normal path: step through stripes and part-stripes in the range, evaluate whether each one belongs
// to Self, and add the stripe's block count to our total if so.
let mut result: u64 = 0;
let mut stripe_start = self.range.start;
while stripe_start < self.range.end {
let is_key_disposable = self.shard_identity.is_key_disposable(&stripe_start);
// Count up to the next stripe_size boundary
let stripe_index = stripe_start.field6 / self.shard_identity.stripe_size.0;
let stripe_remainder = self.shard_identity.stripe_size.0
- (stripe_start.field6 - stripe_index * self.shard_identity.stripe_size.0);
let next_stripe_start = stripe_start.add(stripe_remainder);
let stripe_end = std::cmp::min(self.range.end, next_stripe_start);
let mut cursor = self.range.start;
while cursor < self.range.end {
// Count up to the next stripe_size boundary or end of range
let advance_by = self.advance_to_next_boundary(cursor);
cursor = cursor.add(advance_by);
// If this blocks in this stripe belong to us, add them to our count
if !is_key_disposable {
// Keys must be nearby because we earlier used raw_size and would have
// droped out with u32::MAX if they were distant.
let diff = nearby_key_delta(&stripe_start, &stripe_end);
result += diff;
if !self.shard_identity.is_key_disposable(&cursor) {
result += advance_by as u64;
}
stripe_start = next_stripe_start;
}
// Sharding should always decrease the number of pages we estimate, never increase it
@@ -148,6 +178,23 @@ impl<'a> ShardedRange<'a> {
}
}
/// Advance the cursor to the next potential fragment boundary: this is either
/// a stripe boundary, or the end of the range.
fn advance_to_next_boundary(&self, cursor: Key) -> u32 {
let distance_to_range_end = nearby_key_delta(&cursor, &self.range.end);
if self.shard_identity.count < ShardCount::new(2) {
// Optimization: don't bother stepping through stripes if the tenant isn't sharded.
return Self::raw_size(&self.range);
}
let stripe_index = cursor.field6 / self.shard_identity.stripe_size.0;
let stripe_remainder = self.shard_identity.stripe_size.0
- (cursor.field6 - stripe_index * self.shard_identity.stripe_size.0);
std::cmp::min(stripe_remainder as u64, distance_to_range_end) as u32
}
/// Whereas `page_count` estimates the number of pages physically in this range on this shard,
/// this function simply calculates the number of pages in the space, without accounting for those
/// pages that would not actually be stored on this node.
@@ -182,7 +229,7 @@ impl KeySpace {
///
pub fn partition(&self, shard_identity: &ShardIdentity, target_size: u64) -> KeyPartitioning {
// Assume that each value is 8k in size.
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
let target_nblocks = (target_size / BLCKSZ as u64) as u32;
let mut parts = Vec::new();
let mut current_part = Vec::new();
@@ -198,7 +245,8 @@ impl KeySpace {
// partition would cause it to be too large, and our current partition
// covers at least one block that is physically present in this shard,
// then start a new partition
if current_part_size + range_size as usize > target_nblocks && current_part_size > 0
if current_part_size + range_size as usize > target_nblocks as usize
&& current_part_size > 0
{
parts.push(KeySpace {
ranges: current_part,
@@ -934,4 +982,165 @@ mod tests {
}
}
}
/// Test helper: construct a ShardedRange and call fragment() on it, returning
/// the total page count in the range and the fragments.
fn do_fragment(
range_start: Key,
range_end: Key,
shard_identity: &ShardIdentity,
target_nblocks: u32,
) -> (u32, Vec<(u32, Range<Key>)>) {
let range = ShardedRange::new(
Range {
start: range_start,
end: range_end,
},
shard_identity,
);
let page_count = range.page_count();
let fragments = range.fragment(target_nblocks);
// Invariant: we always get at least one fragment
assert!(!fragments.is_empty());
if page_count > 0 {
// Invariant: every fragment must contain at least one shard-local page, if the
// total range contains at least one shard-local page
let all_nonzero = fragments.iter().all(|f| f.0 > 0);
if !all_nonzero {
eprintln!("Found a zero-length fragment: {:?}", fragments);
}
assert!(all_nonzero);
} else {
// A range with no shard-local pages should always be returned as a single fragment
assert_eq!(fragments, vec![(0, range_start..range_end)]);
}
(page_count, fragments)
}
/// Really simple tests for fragment(), on a range that just contains a single stripe
/// for a single tenant.
#[test]
fn sharded_range_fragment_simple() {
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
// A range which we happen to know covers exactly one stripe which belongs to this shard
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000008000").unwrap();
// Ask for stripe_size blocks, we get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 32768),
(32768, vec![(32768, input_start..input_end)])
);
// Ask for more, we still get the whole stripe
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 10000000),
(32768, vec![(32768, input_start..input_end)])
);
// Ask for target_nblocks of half the stripe size, we get two halves
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 16384),
(
32768,
vec![
(16384, input_start..input_start.add(16384)),
(16384, input_start.add(16384)..input_end)
]
)
);
}
#[test]
fn sharded_range_fragment_multi_stripe() {
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
// A range which covers multiple stripes, exactly one of which belongs to the current shard.
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
// Ask for all the blocks, get a fragment that covers the whole range but reports
// its size to be just the blocks belonging to our shard.
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 131072),
(32768, vec![(32768, input_start..input_end)])
);
// Ask for a sub-stripe quantity
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 16000),
(
32768,
vec![
(16000, input_start..input_start.add(16000)),
(16000, input_start.add(16000)..input_start.add(32000)),
(768, input_start.add(32000)..input_end),
]
)
);
// Try on a range that starts slightly after our owned stripe
assert_eq!(
do_fragment(input_start.add(1), input_end, &shard_identity, 131072),
(32767, vec![(32767, input_start.add(1)..input_end)])
);
}
/// Test that ShardedRange behaves properly when used on un-sharded data
#[test]
fn sharded_range_fragment_unsharded() {
let shard_identity = ShardIdentity::unsharded();
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000010000").unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x8000),
(
0x10000,
vec![
(0x8000, input_start..input_start.add(0x8000)),
(0x8000, input_start.add(0x8000)..input_start.add(0x10000))
]
)
);
}
#[test]
fn sharded_range_fragment_cross_relation() {
let shard_identity = ShardIdentity::unsharded();
// A range that spans relations: expect fragmentation to give up and return a u32::MAX size
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000068f00000001000000ae0000010000").unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x8000),
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
);
// Same, but using a sharded identity
let shard_identity = ShardIdentity::new(
ShardNumber(0),
ShardCount::new(4),
ShardParameters::DEFAULT_STRIPE_SIZE,
)
.unwrap();
assert_eq!(
do_fragment(input_start, input_end, &shard_identity, 0x8000),
(u32::MAX, vec![(u32::MAX, input_start..input_end),])
);
}
}