Freeze layers at the same end LSN (#1182)

* Freeze vectors at the same end LSN

* Fix calculation of last LSN for inmem layer

* Do not advance disk_consistent_lsn is no open layer was evicted

* Fix calculation of freeze_end_lsn

* Let start_lsn be larger than oldest_pending_lsn

* Rename 'oldest_pending_lsn' and 'last_lsn', add comments.

* Fix future_layerfiles test

* Update comments conserning olest_lsn

* Update comments conserning olest_lsn

Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
This commit is contained in:
Konstantin Knizhnik
2022-01-27 18:21:00 +03:00
committed by GitHub
parent cedde559b8
commit f58a22d07e
3 changed files with 144 additions and 88 deletions

View File

@@ -71,7 +71,6 @@ mod storage_layer;
use delta_layer::DeltaLayer;
use ephemeral_file::is_ephemeral_file;
use filename::{DeltaFileName, ImageFileName};
use global_layer_map::{LayerId, GLOBAL_LAYER_MAP};
use image_layer::ImageLayer;
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
@@ -1161,8 +1160,8 @@ impl LayeredTimeline {
// create an ImageLayer struct for each image file.
if imgfilename.lsn > disk_consistent_lsn {
warn!(
"found future image layer {} on timeline {}",
imgfilename, self.timelineid
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
imgfilename, self.timelineid, disk_consistent_lsn
);
rename_to_backup(direntry.path())?;
@@ -1185,8 +1184,8 @@ impl LayeredTimeline {
// before crash.
if deltafilename.end_lsn > disk_consistent_lsn + 1 {
warn!(
"found future delta layer {} on timeline {}",
deltafilename, self.timelineid
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
deltafilename, self.timelineid, disk_consistent_lsn
);
rename_to_backup(direntry.path())?;
@@ -1382,7 +1381,7 @@ impl LayeredTimeline {
self.tenantid,
seg,
lsn,
lsn,
last_record_lsn,
)?;
} else {
return Ok(open_layer);
@@ -1425,7 +1424,7 @@ impl LayeredTimeline {
self.timelineid,
self.tenantid,
start_lsn,
lsn,
last_record_lsn,
)?;
} else {
// New relation.
@@ -1436,8 +1435,14 @@ impl LayeredTimeline {
lsn
);
layer =
InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
layer = InMemoryLayer::create(
self.conf,
self.timelineid,
self.tenantid,
seg,
lsn,
last_record_lsn,
)?;
}
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
@@ -1454,7 +1459,7 @@ impl LayeredTimeline {
// Prevent concurrent checkpoints
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
let mut write_guard = self.write_lock.lock().unwrap();
let write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
// Bump the generation number in the layer map, so that we can distinguish
@@ -1480,11 +1485,17 @@ impl LayeredTimeline {
let mut disk_consistent_lsn = last_record_lsn;
let mut layer_paths = Vec::new();
let mut freeze_end_lsn = Lsn(0);
let mut evicted_layers = Vec::new();
//
// Determine which layers we need to evict and calculate max(latest_lsn)
// among those layers.
//
while let Some((oldest_layer_id, oldest_layer, oldest_generation)) =
layers.peek_oldest_open()
{
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let oldest_lsn = oldest_layer.get_oldest_lsn();
// Does this layer need freezing?
//
// Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE.
@@ -1493,28 +1504,60 @@ impl LayeredTimeline {
// when we started. We don't want to process layers inserted after we started, to
// avoid getting into an infinite loop trying to process again entries that we
// inserted ourselves.
let distance = last_record_lsn.widening_sub(oldest_pending_lsn);
if distance < 0
//
// Once we have decided to write out at least one layer, we must also write out
// any other layers that contain WAL older than the end LSN of the layers we have
// already decided to write out. In other words, we must write out all layers
// whose [oldest_lsn, latest_lsn) range overlaps with any of the other layers
// that we are writing out. Otherwise, when we advance 'disk_consistent_lsn', it's
// ambiguous whether those layers are already durable on disk or not. For example,
// imagine that there are two layers in memory that contain page versions in the
// following LSN ranges:
//
// A: 100-150
// B: 110-200
//
// If we flush layer A, we must also flush layer B, because they overlap. If we
// flushed only A, and advanced 'disk_consistent_lsn' to 150, we would break the
// rule that all WAL older than 'disk_consistent_lsn' are durable on disk, because
// B contains some WAL older than 150. On the other hand, if we flushed out A and
// advanced 'disk_consistent_lsn' only up to 110, after crash and restart we would
// delete the first layer because its end LSN is larger than 110. If we changed
// the deletion logic to not delete it, then we would start streaming at 110, and
// process again the WAL records in the range 110-150 that are already in layer A,
// and the WAL processing code does not cope with that. We solve that dilemma by
// insisting that if we write out the first layer, we also write out the second
// layer, and advance disk_consistent_lsn all the way up to 200.
//
let distance = last_record_lsn.widening_sub(oldest_lsn);
if (distance < 0
|| distance < checkpoint_distance.into()
|| oldest_generation == current_generation
|| oldest_generation == current_generation)
&& oldest_lsn >= freeze_end_lsn
// this layer intersects with evicted layer and so also need to be evicted
{
info!(
"the oldest layer is now {} which is {} bytes behind last_record_lsn",
oldest_layer.filename().display(),
distance
);
disk_consistent_lsn = oldest_pending_lsn;
disk_consistent_lsn = oldest_lsn;
break;
}
let latest_lsn = oldest_layer.get_latest_lsn();
if latest_lsn > freeze_end_lsn {
freeze_end_lsn = latest_lsn; // calculate max of latest_lsn of the layers we're about to evict
}
layers.remove_open(oldest_layer_id);
evicted_layers.push((oldest_layer_id, oldest_layer));
}
drop(layers);
drop(write_guard);
let mut this_layer_paths = self.evict_layer(oldest_layer_id, reconstruct_pages)?;
layer_paths.append(&mut this_layer_paths);
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
// Freeze evicted layers
for (_evicted_layer_id, evicted_layer) in evicted_layers.iter() {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
evicted_layer.freeze(freeze_end_lsn);
layers.insert_historic(evicted_layer.clone());
}
// Call unload() on all frozen layers, to release memory.
@@ -1527,6 +1570,14 @@ impl LayeredTimeline {
drop(layers);
drop(write_guard);
// Create delta/image layers for evicted layers
for (_evicted_layer_id, evicted_layer) in evicted_layers.iter() {
let mut this_layer_paths =
self.evict_layer(evicted_layer.clone(), reconstruct_pages)?;
layer_paths.append(&mut this_layer_paths);
}
// Sync layers
if !layer_paths.is_empty() {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
@@ -1594,52 +1645,29 @@ impl LayeredTimeline {
Ok(())
}
fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result<Vec<PathBuf>> {
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
fn evict_layer(
&self,
layer: Arc<InMemoryLayer>,
reconstruct_pages: bool,
) -> Result<Vec<PathBuf>> {
let new_historics = layer.write_to_disk(self, reconstruct_pages)?;
let mut layer_paths = Vec::new();
let _write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap();
if let Some(oldest_layer) = global_layer_map.get(&layer_id) {
drop(global_layer_map);
oldest_layer.freeze(self.get_last_record_lsn());
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(layer);
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.remove_open(layer_id);
layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self, reconstruct_pages)?;
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(oldest_layer);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_paths.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_paths.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
layer_paths.push(delta_layer.path());
layers.insert_historic(Arc::new(delta_layer));
}
for image_layer in new_historics.image_layers {
layer_paths.push(image_layer.path());
layers.insert_historic(Arc::new(image_layer));
}
drop(layers);
drop(write_guard);
Ok(layer_paths)
}

View File

@@ -39,8 +39,20 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
/// LSN of the oldest page version stored in this layer
oldest_pending_lsn: Lsn,
///
/// LSN of the oldest page version stored in this layer.
///
/// This is different from 'start_lsn' in that we enforce that the 'start_lsn'
/// of a layer always matches the 'end_lsn' of its predecessor, even if there
/// are no page versions until at a later LSN. That way you can detect any
/// missing layer files more easily. 'oldest_lsn' is the first page version
/// actually stored in this layer. In the range between 'start_lsn' and
/// 'oldest_lsn', there are no changes to the segment.
/// 'oldest_lsn' is used to adjust 'disk_consistent_lsn' and that is why it should
/// point to the beginning of WAL record. This is the other difference with 'start_lsn'
/// which points to end of WAL record. This is why 'oldest_lsn' can be smaller than 'start_lsn'.
///
oldest_lsn: Lsn,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
@@ -73,6 +85,14 @@ pub struct InMemoryLayerInner {
/// a non-blocky rel, 'seg_sizes' is not used and is always empty.
///
seg_sizes: VecMap<Lsn, SegmentBlk>,
///
/// LSN of the newest page version stored in this layer.
///
/// The difference between 'end_lsn' and 'latest_lsn' is the same as between
/// 'start_lsn' and 'oldest_lsn'. See comments in 'oldest_lsn'.
///
latest_lsn: Lsn,
}
impl InMemoryLayerInner {
@@ -319,8 +339,13 @@ pub struct LayersOnDisk {
impl InMemoryLayer {
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
pub fn get_oldest_lsn(&self) -> Lsn {
self.oldest_lsn
}
pub fn get_latest_lsn(&self) -> Lsn {
let inner = self.inner.read().unwrap();
inner.latest_lsn
}
///
@@ -332,7 +357,7 @@ impl InMemoryLayer {
tenantid: ZTenantId,
seg: SegmentTag,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
oldest_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!(
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
@@ -355,13 +380,14 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
oldest_pending_lsn,
oldest_lsn,
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
dropped: false,
page_versions: PageVersions::new(file),
seg_sizes,
latest_lsn: oldest_lsn,
}),
})
}
@@ -398,6 +424,8 @@ impl InMemoryLayer {
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
assert!(lsn >= inner.latest_lsn);
inner.latest_lsn = lsn;
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?;
@@ -509,12 +537,11 @@ impl InMemoryLayer {
timelineid: ZTimelineId,
tenantid: ZTenantId,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
oldest_lsn: Lsn,
) -> Result<InMemoryLayer> {
let seg = src.get_seg_tag();
assert!(oldest_pending_lsn.is_aligned());
assert!(oldest_pending_lsn >= start_lsn);
assert!(oldest_lsn.is_aligned());
trace!(
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
@@ -538,13 +565,14 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
oldest_pending_lsn,
oldest_lsn,
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
dropped: false,
page_versions: PageVersions::new(file),
seg_sizes,
latest_lsn: oldest_lsn,
}),
})
}

View File

@@ -40,7 +40,7 @@ pub struct LayerMap {
/// All the layers keyed by segment tag
segs: HashMap<SegmentTag, SegEntry>,
/// All in-memory layers, ordered by 'oldest_pending_lsn' and generation
/// All in-memory layers, ordered by 'oldest_lsn' and generation
/// of each layer. This allows easy access to the in-memory layer that
/// contains the oldest WAL record.
open_layers: BinaryHeap<OpenLayerEntry>,
@@ -83,16 +83,16 @@ impl LayerMap {
let layer_id = segentry.update_open(Arc::clone(&layer));
let oldest_pending_lsn = layer.get_oldest_pending_lsn();
let oldest_lsn = layer.get_oldest_lsn();
// After a crash and restart, 'oldest_pending_lsn' of the oldest in-memory
// After a crash and restart, 'oldest_lsn' of the oldest in-memory
// layer becomes the WAL streaming starting point, so it better not point
// in the middle of a WAL record.
assert!(oldest_pending_lsn.is_aligned());
assert!(oldest_lsn.is_aligned());
// Also add it to the binary heap
let open_layer_entry = OpenLayerEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
oldest_lsn: layer.get_oldest_lsn(),
layer_id,
generation: self.current_generation,
};
@@ -352,23 +352,23 @@ impl SegEntry {
}
/// Entry held in LayerMap::open_layers, with boilerplate comparison routines
/// to implement a min-heap ordered by 'oldest_pending_lsn' and 'generation'
/// to implement a min-heap ordered by 'oldest_lsn' and 'generation'
///
/// The generation number associated with each entry can be used to distinguish
/// recently-added entries (i.e after last call to increment_generation()) from older
/// entries with the same 'oldest_pending_lsn'.
/// entries with the same 'oldest_lsn'.
struct OpenLayerEntry {
oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
oldest_lsn: Lsn, // copy of layer.get_oldest_lsn()
generation: u64,
layer_id: LayerId,
}
impl Ord for OpenLayerEntry {
fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that. Entries with identical oldest_pending_lsn are ordered by generation
// to get that. Entries with identical oldest_lsn are ordered by generation
other
.oldest_pending_lsn
.cmp(&self.oldest_pending_lsn)
.oldest_lsn
.cmp(&self.oldest_lsn)
.then_with(|| other.generation.cmp(&self.generation))
}
}
@@ -437,7 +437,7 @@ mod tests {
conf: &'static PageServerConf,
segno: u32,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
oldest_lsn: Lsn,
) -> Arc<InMemoryLayer> {
Arc::new(
InMemoryLayer::create(
@@ -449,7 +449,7 @@ mod tests {
segno,
},
start_lsn,
oldest_pending_lsn,
oldest_lsn,
)
.unwrap(),
)