Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Brady
c93ac95d25 pageserver_api: implement keyspace range iterator (#6435) 2025-02-12 20:07:39 +00:00
2 changed files with 204 additions and 64 deletions

View File

@@ -27,6 +27,55 @@ impl std::fmt::Display for KeySpace {
}
}
pub struct KeySpaceIter<'a> {
ranges: &'a [Range<Key>],
current_range: usize,
current_key: Option<Key>,
}
impl<'a> Iterator for KeySpaceIter<'a> {
// (current_key, range_end)
type Item = (Key, Key);
fn next(&mut self) -> Option<Self::Item> {
// if we've gone through all ranges, stop iteration
if self.current_range >= self.ranges.len() {
return None;
}
let range = &self.ranges[self.current_range];
// if current_key is None, initialize it with range.start
if self.current_key.is_none() {
self.current_key = Some(range.start);
}
if let Some(current_key) = self.current_key {
// check if the current_key is still within the range
if current_key < range.end {
// get the next key
let next_key = current_key.next();
// move current_key forward
self.current_key = Some(next_key);
// return the range from current_key to the current range_end
return Some((current_key, range.end));
}
}
// move to the next range
self.current_range += 1;
// if there are more ranges, initialize the next key from the start of the next range
if self.current_range < self.ranges.len() {
self.current_key = Some(self.ranges[self.current_range].start);
self.next() // Recurse to continue iterating with the new range
} else {
// no more ranges, end iteration
None
}
}
}
/// A wrapper type for sparse keyspaces.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SparseKeySpace(pub KeySpace);
@@ -435,6 +484,14 @@ impl KeySpace {
pub fn contains(&self, key: &Key) -> bool {
self.overlaps(&(*key..key.next()))
}
pub fn iter(&self) -> KeySpaceIter {
KeySpaceIter {
ranges: &self.ranges,
current_range: 0,
current_key: self.ranges.first().map(|r| r.start),
}
}
}
///
@@ -619,6 +676,92 @@ mod tests {
use super::*;
use std::fmt::Write;
fn key(field1: u8, field2: u32, field6: u32) -> Key {
Key {
field1,
field2,
field3: 0,
field4: 0,
field5: 0,
field6,
}
}
#[test]
fn test_iter_single_range() {
let key_range = Range {
start: key(1, 0, 0),
end: key(1, 0, 5),
};
let keyspace = KeySpace {
ranges: vec![key_range],
};
let collected_ranges: Vec<_> = keyspace.iter().collect();
assert_eq!(
collected_ranges,
vec![
(key(1, 0, 0), key(1, 0, 5)),
(key(1, 0, 1), key(1, 0, 5)),
(key(1, 0, 2), key(1, 0, 5)),
(key(1, 0, 3), key(1, 0, 5)),
(key(1, 0, 4), key(1, 0, 5)), // Stops at field6 = 5
]
);
}
#[test]
fn test_iter_multiple_ranges() {
let ranges = vec![
Range {
start: key(1, 0, 0),
end: key(1, 0, 3),
},
Range {
start: key(2, 0, 0),
end: key(2, 0, 2),
},
];
let keyspace = KeySpace { ranges };
let collected_ranges: Vec<_> = keyspace.iter().collect();
assert_eq!(
collected_ranges,
vec![
(key(1, 0, 0), key(1, 0, 3)),
(key(1, 0, 1), key(1, 0, 3)),
(key(1, 0, 2), key(1, 0, 3)), // End of first range
(key(2, 0, 0), key(2, 0, 2)),
(key(2, 0, 1), key(2, 0, 2)), // End of second range
]
);
}
#[test]
fn test_iter_empty_keyspace() {
let keyspace = KeySpace { ranges: vec![] };
let mut iter = keyspace.iter();
assert_eq!(iter.next(), None);
}
#[test]
fn test_iter_range_with_single_key() {
let key_range = Range {
start: key(1, 42, 0),
end: key(1, 42, 1), // Only one step
};
let keyspace = KeySpace {
ranges: vec![key_range],
};
let mut iter = keyspace.iter();
assert_eq!(iter.next(), Some((key(1, 42, 0), key(1, 42, 1))));
assert_eq!(iter.next(), None);
}
// Helper function to create a key range.
//
// Make the tests below less verbose.

View File

@@ -4552,74 +4552,71 @@ impl Timeline {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
for (key, range_end) in partition.iter() {
// Decide whether to retain this key: usually we do, but sharded tenants may
// need to drop keys that don't belong to them. If we retain the key, add it
// to `key_request_accum` for later issuing a vectored get
if self.shard_identity.is_key_disposable(&key) {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
} else {
key_request_accum.add_key(key);
}
let last_key_in_range = key.next() == range_end;
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
io_concurrency.clone(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
let last_key_in_range = key.next() == range.end;
key = key.next();
// Maybe flush `key_rest_accum`
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|| (last_key_in_range && key_request_accum.raw_size() > 0)
{
let results = self
.get_vectored(
key_request_accum.consume_keyspace(),
lsn,
io_concurrency.clone(),
ctx,
)
.await?;
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::from(err));
}
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
Err(err) => {
// If we fail to reconstruct a VM or FSM page, we can zero the
// page without losing any actual user data. That seems better
// than failing repeatedly and getting stuck.
//
// We had a bug at one point, where we truncated the FSM and VM
// in the pageserver, but the Postgres didn't know about that
// and continued to generate incremental WAL records for pages
// that didn't exist in the pageserver. Trying to replay those
// WAL records failed to find the previous image of the page.
// This special case allows us to recover from that situation.
// See https://github.com/neondatabase/neon/issues/2601.
//
// Unfortunately we cannot do this for the main fork, or for
// any metadata keys, keys, as that would lead to actual data
// loss.
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
ZERO_PAGE.clone()
} else {
return Err(CreateImageLayersError::from(err));
}
};
}
};
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
// Write all the keys we just read into our new image layer.
image_layer_writer.put_image(img_key, img, ctx).await?;
wrote_keys = true;
}
}
}