Fix infinite loop with forced repository checkpoint.

To fix, break out of the loop when you reach an in-memory layer that was
created after the checkpoint started. To do that, add a "generation"
counter into the layer map.

Fixes https://github.com/zenithdb/zenith/issues/494
This commit is contained in:
Heikki Linnakangas
2021-09-02 15:41:40 +03:00
parent c3cbb56ff8
commit 66929ad6fb
3 changed files with 69 additions and 20 deletions

View File

@@ -71,7 +71,7 @@ static TIMEOUT: Duration = Duration::from_secs(60);
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024;
static OLDEST_INMEM_DISTANCE: i128 = 16 * 1024 * 1024;
// Metrics collected on operations on the storage repository.
lazy_static! {
@@ -1074,6 +1074,18 @@ impl LayeredTimeline {
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint_internal(&self, force: bool) -> Result<()> {
// Grab lock on the layer map.
//
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
// the checkpointing could take many seconds, and any incoming get_page_at_lsn()
// requests will block.
let mut layers = self.layers.lock().unwrap();
// Bump the generation number in the layer map, so that we can distinguish
// entries inserted after the checkpoint started
let current_generation = layers.increment_generation();
// Read 'last_record_lsn'. That becomes the cutoff LSN for frozen layers.
let RecordLsn {
last: last_record_lsn,
prev: prev_record_lsn,
@@ -1085,13 +1097,6 @@ impl LayeredTimeline {
last_record_lsn
);
// Grab lock on the layer map.
//
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
// the checkpointing could take many seconds, and any incoming get_page_at_lsn()
// requests will block.
let mut layers = self.layers.lock().unwrap();
// Take the in-memory layer with the oldest WAL record. If it's older
// than the threshold, write it out to disk as a new image and delta file.
// Repeat until all remaining in-memory layers are within the threshold.
@@ -1102,14 +1107,26 @@ impl LayeredTimeline {
// check, though. We should also aim at flushing layers that consume
// a lot of memory and/or aren't receiving much updates anymore.
let mut disk_consistent_lsn = last_record_lsn;
while let Some(oldest_layer) = layers.peek_oldest_open() {
// Does this layer need freezing?
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = last_record_lsn.0 - oldest_pending_lsn.0;
if !force && distance < OLDEST_INMEM_DISTANCE {
// Does this layer need freezing?
//
// Write out all in-memory layers that contain WAL older than OLDEST_INMEM_DISTANCE.
// Or if 'force' is true, write out all of them. If we reach a layer with the same
// generation number, we know that we have cycled through all layers that were open
// when we started. We don't want to process layers inserted after we started, to
// avoid getting into an infinite loop trying to process again entries that we
// inserted ourselves.
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
if distance < 0
|| (!force && distance < OLDEST_INMEM_DISTANCE)
|| oldest_generation == current_generation
{
info!(
"the oldest layer is now {} which is {} bytes behind last_record_lsn",
oldest_layer.get_seg_tag(),
oldest_layer.filename().display(),
distance
);
disk_consistent_lsn = oldest_pending_lsn;

View File

@@ -43,6 +43,10 @@ pub struct LayerMap {
/// This allows easy access to the in-memory layer that contains the
/// oldest WAL record.
open_segs: BinaryHeap<OpenSegEntry>,
/// Generation number, used to distinguish newly inserted entries in the
/// binary heap from older entries during checkpoint.
current_generation: u64,
}
///
@@ -59,9 +63,13 @@ struct SegEntry {
/// Entry held LayerMap.open_segs, with boilerplate comparison
/// routines to implement a min-heap ordered by 'oldest_pending_lsn'
///
/// Each entry also carries a generation number. It can be used to distinguish
/// entries with the same 'oldest_pending_lsn'.
struct OpenSegEntry {
pub oldest_pending_lsn: Lsn,
pub layer: Arc<InMemoryLayer>,
pub generation: u64,
}
impl Ord for OpenSegEntry {
fn cmp(&self, other: &Self) -> Ordering {
@@ -73,10 +81,13 @@ impl Ord for OpenSegEntry {
impl PartialOrd for OpenSegEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
other
.oldest_pending_lsn
.partial_cmp(&self.oldest_pending_lsn)
// to get that. Entries with identical oldest_pending_lsn are ordered by generation
Some(
other
.oldest_pending_lsn
.cmp(&self.oldest_pending_lsn)
.then_with(|| other.generation.cmp(&self.generation)),
)
}
}
impl PartialEq for OpenSegEntry {
@@ -151,6 +162,7 @@ impl LayerMap {
let opensegentry = OpenSegEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer: layer,
generation: self.current_generation,
};
self.open_segs.push(opensegentry);
@@ -290,15 +302,23 @@ impl LayerMap {
false
}
/// Return the oldest in-memory layer.
pub fn peek_oldest_open(&self) -> Option<Arc<InMemoryLayer>> {
/// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> {
if let Some(opensegentry) = self.open_segs.peek() {
Some(Arc::clone(&opensegentry.layer))
Some((Arc::clone(&opensegentry.layer), opensegentry.generation))
} else {
None
}
}
/// Increment the generation number used to stamp open in-memory layers. Layers
/// added with `insert_open` after this call will be associated with the new
/// generation. Returns the new generation number.
pub fn increment_generation(&mut self) -> u64 {
self.current_generation += 1;
self.current_generation
}
pub fn iter_historic_layers(&self) -> HistoricLayerIter {
HistoricLayerIter {
segiter: self.segs.iter(),
@@ -312,6 +332,7 @@ impl Default for LayerMap {
LayerMap {
segs: HashMap::new(),
open_segs: BinaryHeap::new(),
current_generation: 0,
}
}
}

View File

@@ -32,6 +32,12 @@ impl Lsn {
self.0.checked_sub(other).map(Lsn)
}
/// Subtract a number, returning the difference as i128 to avoid overflow.
pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
let other: u64 = other.into();
i128::from(self.0) - i128::from(other)
}
/// Parse an LSN from a filename in the form `0000000000000000`
pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
where
@@ -264,6 +270,11 @@ mod tests {
assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1)));
assert_eq!(Lsn(1234).checked_sub(1235u64), None);
assert_eq!(Lsn(1235).widening_sub(1234u64), 1);
assert_eq!(Lsn(1234).widening_sub(1235u64), -1);
assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX));
assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX));
let seg_sz: usize = 16 * 1024 * 1024;
assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7);
assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);