mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-16 09:00:38 +00:00
Fix infinite loop with forced repository checkpoint.
To fix, break out of the loop when you reach an in-memory layer that was created after the checkpoint started. To do that, add a "generation" counter into the layer map. Fixes https://github.com/zenithdb/zenith/issues/494
This commit is contained in:
@@ -71,7 +71,7 @@ static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024;
|
||||
static OLDEST_INMEM_DISTANCE: i128 = 16 * 1024 * 1024;
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
@@ -1074,6 +1074,18 @@ impl LayeredTimeline {
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint_internal(&self, force: bool) -> Result<()> {
|
||||
// Grab lock on the layer map.
|
||||
//
|
||||
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
|
||||
// the checkpointing could take many seconds, and any incoming get_page_at_lsn()
|
||||
// requests will block.
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
// Bump the generation number in the layer map, so that we can distinguish
|
||||
// entries inserted after the checkpoint started
|
||||
let current_generation = layers.increment_generation();
|
||||
|
||||
// Read 'last_record_lsn'. That becomes the cutoff LSN for frozen layers.
|
||||
let RecordLsn {
|
||||
last: last_record_lsn,
|
||||
prev: prev_record_lsn,
|
||||
@@ -1085,13 +1097,6 @@ impl LayeredTimeline {
|
||||
last_record_lsn
|
||||
);
|
||||
|
||||
// Grab lock on the layer map.
|
||||
//
|
||||
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
|
||||
// the checkpointing could take many seconds, and any incoming get_page_at_lsn()
|
||||
// requests will block.
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
// Take the in-memory layer with the oldest WAL record. If it's older
|
||||
// than the threshold, write it out to disk as a new image and delta file.
|
||||
// Repeat until all remaining in-memory layers are within the threshold.
|
||||
@@ -1102,14 +1107,26 @@ impl LayeredTimeline {
|
||||
// check, though. We should also aim at flushing layers that consume
|
||||
// a lot of memory and/or aren't receiving much updates anymore.
|
||||
let mut disk_consistent_lsn = last_record_lsn;
|
||||
while let Some(oldest_layer) = layers.peek_oldest_open() {
|
||||
// Does this layer need freezing?
|
||||
|
||||
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
|
||||
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
|
||||
let distance = last_record_lsn.0 - oldest_pending_lsn.0;
|
||||
if !force && distance < OLDEST_INMEM_DISTANCE {
|
||||
|
||||
// Does this layer need freezing?
|
||||
//
|
||||
// Write out all in-memory layers that contain WAL older than OLDEST_INMEM_DISTANCE.
|
||||
// Or if 'force' is true, write out all of them. If we reach a layer with the same
|
||||
// generation number, we know that we have cycled through all layers that were open
|
||||
// when we started. We don't want to process layers inserted after we started, to
|
||||
// avoid getting into an infinite loop trying to process again entries that we
|
||||
// inserted ourselves.
|
||||
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
|
||||
if distance < 0
|
||||
|| (!force && distance < OLDEST_INMEM_DISTANCE)
|
||||
|| oldest_generation == current_generation
|
||||
{
|
||||
info!(
|
||||
"the oldest layer is now {} which is {} bytes behind last_record_lsn",
|
||||
oldest_layer.get_seg_tag(),
|
||||
oldest_layer.filename().display(),
|
||||
distance
|
||||
);
|
||||
disk_consistent_lsn = oldest_pending_lsn;
|
||||
|
||||
@@ -43,6 +43,10 @@ pub struct LayerMap {
|
||||
/// This allows easy access to the in-memory layer that contains the
|
||||
/// oldest WAL record.
|
||||
open_segs: BinaryHeap<OpenSegEntry>,
|
||||
|
||||
/// Generation number, used to distinguish newly inserted entries in the
|
||||
/// binary heap from older entries during checkpoint.
|
||||
current_generation: u64,
|
||||
}
|
||||
|
||||
///
|
||||
@@ -59,9 +63,13 @@ struct SegEntry {
|
||||
|
||||
/// Entry held LayerMap.open_segs, with boilerplate comparison
|
||||
/// routines to implement a min-heap ordered by 'oldest_pending_lsn'
|
||||
///
|
||||
/// Each entry also carries a generation number. It can be used to distinguish
|
||||
/// entries with the same 'oldest_pending_lsn'.
|
||||
struct OpenSegEntry {
|
||||
pub oldest_pending_lsn: Lsn,
|
||||
pub layer: Arc<InMemoryLayer>,
|
||||
pub generation: u64,
|
||||
}
|
||||
impl Ord for OpenSegEntry {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
@@ -73,10 +81,13 @@ impl Ord for OpenSegEntry {
|
||||
impl PartialOrd for OpenSegEntry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that.
|
||||
other
|
||||
.oldest_pending_lsn
|
||||
.partial_cmp(&self.oldest_pending_lsn)
|
||||
// to get that. Entries with identical oldest_pending_lsn are ordered by generation
|
||||
Some(
|
||||
other
|
||||
.oldest_pending_lsn
|
||||
.cmp(&self.oldest_pending_lsn)
|
||||
.then_with(|| other.generation.cmp(&self.generation)),
|
||||
)
|
||||
}
|
||||
}
|
||||
impl PartialEq for OpenSegEntry {
|
||||
@@ -151,6 +162,7 @@ impl LayerMap {
|
||||
let opensegentry = OpenSegEntry {
|
||||
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
|
||||
layer: layer,
|
||||
generation: self.current_generation,
|
||||
};
|
||||
self.open_segs.push(opensegentry);
|
||||
|
||||
@@ -290,15 +302,23 @@ impl LayerMap {
|
||||
false
|
||||
}
|
||||
|
||||
/// Return the oldest in-memory layer.
|
||||
pub fn peek_oldest_open(&self) -> Option<Arc<InMemoryLayer>> {
|
||||
/// Return the oldest in-memory layer, along with its generation number.
|
||||
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> {
|
||||
if let Some(opensegentry) = self.open_segs.peek() {
|
||||
Some(Arc::clone(&opensegentry.layer))
|
||||
Some((Arc::clone(&opensegentry.layer), opensegentry.generation))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment the generation number used to stamp open in-memory layers. Layers
|
||||
/// added with `insert_open` after this call will be associated with the new
|
||||
/// generation. Returns the new generation number.
|
||||
pub fn increment_generation(&mut self) -> u64 {
|
||||
self.current_generation += 1;
|
||||
self.current_generation
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> HistoricLayerIter {
|
||||
HistoricLayerIter {
|
||||
segiter: self.segs.iter(),
|
||||
@@ -312,6 +332,7 @@ impl Default for LayerMap {
|
||||
LayerMap {
|
||||
segs: HashMap::new(),
|
||||
open_segs: BinaryHeap::new(),
|
||||
current_generation: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,12 @@ impl Lsn {
|
||||
self.0.checked_sub(other).map(Lsn)
|
||||
}
|
||||
|
||||
/// Subtract a number, returning the difference as i128 to avoid overflow.
|
||||
pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
|
||||
let other: u64 = other.into();
|
||||
i128::from(self.0) - i128::from(other)
|
||||
}
|
||||
|
||||
/// Parse an LSN from a filename in the form `0000000000000000`
|
||||
pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
|
||||
where
|
||||
@@ -264,6 +270,11 @@ mod tests {
|
||||
assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
|
||||
assert_eq!(Lsn(1234).checked_sub(1235u64), None);
|
||||
|
||||
assert_eq!(Lsn(1235).widening_sub(1234u64), 1);
|
||||
assert_eq!(Lsn(1234).widening_sub(1235u64), -1);
|
||||
assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
|
||||
assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
|
||||
|
||||
let seg_sz: usize = 16 * 1024 * 1024;
|
||||
assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
|
||||
assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);
|
||||
|
||||
Reference in New Issue
Block a user