mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
1 Commits
release-82
...
ci-run/pr-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c93ac95d25 |
@@ -27,6 +27,55 @@ impl std::fmt::Display for KeySpace {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KeySpaceIter<'a> {
|
||||
ranges: &'a [Range<Key>],
|
||||
current_range: usize,
|
||||
current_key: Option<Key>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for KeySpaceIter<'a> {
|
||||
// (current_key, range_end)
|
||||
type Item = (Key, Key);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// if we've gone through all ranges, stop iteration
|
||||
if self.current_range >= self.ranges.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let range = &self.ranges[self.current_range];
|
||||
|
||||
// if current_key is None, initialize it with range.start
|
||||
if self.current_key.is_none() {
|
||||
self.current_key = Some(range.start);
|
||||
}
|
||||
|
||||
if let Some(current_key) = self.current_key {
|
||||
// check if the current_key is still within the range
|
||||
if current_key < range.end {
|
||||
// get the next key
|
||||
let next_key = current_key.next();
|
||||
// move current_key forward
|
||||
self.current_key = Some(next_key);
|
||||
// return the range from current_key to the current range_end
|
||||
return Some((current_key, range.end));
|
||||
}
|
||||
}
|
||||
|
||||
// move to the next range
|
||||
self.current_range += 1;
|
||||
|
||||
// if there are more ranges, initialize the next key from the start of the next range
|
||||
if self.current_range < self.ranges.len() {
|
||||
self.current_key = Some(self.ranges[self.current_range].start);
|
||||
self.next() // Recurse to continue iterating with the new range
|
||||
} else {
|
||||
// no more ranges, end iteration
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type for sparse keyspaces.
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct SparseKeySpace(pub KeySpace);
|
||||
@@ -435,6 +484,14 @@ impl KeySpace {
|
||||
pub fn contains(&self, key: &Key) -> bool {
|
||||
self.overlaps(&(*key..key.next()))
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> KeySpaceIter {
|
||||
KeySpaceIter {
|
||||
ranges: &self.ranges,
|
||||
current_range: 0,
|
||||
current_key: self.ranges.first().map(|r| r.start),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -619,6 +676,92 @@ mod tests {
|
||||
use super::*;
|
||||
use std::fmt::Write;
|
||||
|
||||
fn key(field1: u8, field2: u32, field6: u32) -> Key {
|
||||
Key {
|
||||
field1,
|
||||
field2,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_single_range() {
|
||||
let key_range = Range {
|
||||
start: key(1, 0, 0),
|
||||
end: key(1, 0, 5),
|
||||
};
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
let collected_ranges: Vec<_> = keyspace.iter().collect();
|
||||
|
||||
assert_eq!(
|
||||
collected_ranges,
|
||||
vec![
|
||||
(key(1, 0, 0), key(1, 0, 5)),
|
||||
(key(1, 0, 1), key(1, 0, 5)),
|
||||
(key(1, 0, 2), key(1, 0, 5)),
|
||||
(key(1, 0, 3), key(1, 0, 5)),
|
||||
(key(1, 0, 4), key(1, 0, 5)), // Stops at field6 = 5
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_multiple_ranges() {
|
||||
let ranges = vec![
|
||||
Range {
|
||||
start: key(1, 0, 0),
|
||||
end: key(1, 0, 3),
|
||||
},
|
||||
Range {
|
||||
start: key(2, 0, 0),
|
||||
end: key(2, 0, 2),
|
||||
},
|
||||
];
|
||||
|
||||
let keyspace = KeySpace { ranges };
|
||||
let collected_ranges: Vec<_> = keyspace.iter().collect();
|
||||
|
||||
assert_eq!(
|
||||
collected_ranges,
|
||||
vec![
|
||||
(key(1, 0, 0), key(1, 0, 3)),
|
||||
(key(1, 0, 1), key(1, 0, 3)),
|
||||
(key(1, 0, 2), key(1, 0, 3)), // End of first range
|
||||
(key(2, 0, 0), key(2, 0, 2)),
|
||||
(key(2, 0, 1), key(2, 0, 2)), // End of second range
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_empty_keyspace() {
|
||||
let keyspace = KeySpace { ranges: vec![] };
|
||||
let mut iter = keyspace.iter();
|
||||
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_range_with_single_key() {
|
||||
let key_range = Range {
|
||||
start: key(1, 42, 0),
|
||||
end: key(1, 42, 1), // Only one step
|
||||
};
|
||||
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
|
||||
let mut iter = keyspace.iter();
|
||||
assert_eq!(iter.next(), Some((key(1, 42, 0), key(1, 42, 1))));
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
// Helper function to create a key range.
|
||||
//
|
||||
// Make the tests below less verbose.
|
||||
|
||||
@@ -4552,74 +4552,71 @@ impl Timeline {
|
||||
let mut wrote_keys = false;
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
|
||||
for (key, range_end) in partition.iter() {
|
||||
// Decide whether to retain this key: usually we do, but sharded tenants may
|
||||
// need to drop keys that don't belong to them. If we retain the key, add it
|
||||
// to `key_request_accum` for later issuing a vectored get
|
||||
if self.shard_identity.is_key_disposable(&key) {
|
||||
debug!(
|
||||
"Dropping key {} during compaction (it belongs on shard {:?})",
|
||||
key,
|
||||
self.shard_identity.get_shard_number(&key)
|
||||
);
|
||||
} else {
|
||||
key_request_accum.add_key(key);
|
||||
}
|
||||
|
||||
let last_key_in_range = key.next() == range_end;
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| (last_key_in_range && key_request_accum.raw_size() > 0)
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
key_request_accum.consume_keyspace(),
|
||||
lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
let last_key_in_range = key.next() == range.end;
|
||||
key = key.next();
|
||||
|
||||
// Maybe flush `key_rest_accum`
|
||||
if key_request_accum.raw_size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| (last_key_in_range && key_request_accum.raw_size() > 0)
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
key_request_accum.consume_keyspace(),
|
||||
lsn,
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CreateImageLayersError::Cancelled);
|
||||
}
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::from(err));
|
||||
}
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if img_key.is_rel_fsm_block_key() || img_key.is_rel_vm_block_key() {
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CreateImageLayersError::from(err));
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
// Write all the keys we just read into our new image layer.
|
||||
image_layer_writer.put_image(img_key, img, ctx).await?;
|
||||
wrote_keys = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user