mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
If too much memory is being used for in-memory layers, flush oldest one.
The old policy was to flush all in-memory layers to disk every 10 seconds. That was a pretty dumb policy, unnecessarily aggressive. This commit changes the policy so that we only flush layers where the oldest WAL record is older than 16 MB from the last valid LSN on the timeline. That's still pretty aggressive, but it's a step in the right direction. We do need a limit on how old the oldest in-memory layer is allowed to be, because that determines how much WAL the safekeepers need to hold onto, and how much WAL we need to reprocess in case of a page server crash. 16 MB is surely still too aggressive for that, but it's easy to change the setting later. To support that, keep all in-memory layers in a binary heap, so that we can easily find the one with the oldest LSN. This tracks and a new LSN value in the metadata file: 'disk_consistent_lsn'. Before, on page server restart we restarted the WAL processing from the 'last_record_lsn' value, but now that we don't flush everything to disk in one go, the 'last_record_lsn' tracked in memory is usually ahead of the last record that's been flushed to disk. Even though we track that oldest LSN now, the crash recovery story isn't really complete. We don't do fsync()s anywhere, and thing will break if a snapshot file isn't complete, as there's no CRC on them. That's not new, and it's a TODO.
This commit is contained in:
@@ -54,14 +54,14 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
// Perform a checkpoint in the GC thread, when the LSN has advanced this much since
|
||||
// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if
|
||||
// the page server is restarted.
|
||||
// Flush out an inmemory layer, if it's holding WAL older than this.
|
||||
// This puts a backstop on how much WAL needs to be re-digested if the
|
||||
// page server crashes.
|
||||
//
|
||||
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024;
|
||||
static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024;
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
@@ -103,9 +103,8 @@ impl Repository for LayeredRepository {
|
||||
std::fs::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
|
||||
|
||||
let metadata = TimelineMetadata {
|
||||
last_valid_lsn: start_lsn,
|
||||
last_record_lsn: start_lsn,
|
||||
prev_record_lsn: Lsn(0),
|
||||
disk_consistent_lsn: start_lsn,
|
||||
prev_record_lsn: None,
|
||||
ancestor_timeline: None,
|
||||
ancestor_lsn: start_lsn,
|
||||
};
|
||||
@@ -134,9 +133,8 @@ impl Repository for LayeredRepository {
|
||||
// There is initially no data in it, but all the read-calls know to look
|
||||
// into the ancestor.
|
||||
let metadata = TimelineMetadata {
|
||||
last_valid_lsn: start_lsn,
|
||||
last_record_lsn: start_lsn,
|
||||
prev_record_lsn: src_timeline.get_prev_record_lsn(),
|
||||
disk_consistent_lsn: start_lsn,
|
||||
prev_record_lsn: Some(src_timeline.get_prev_record_lsn()), // FIXME not atomic with start_lsn
|
||||
ancestor_timeline: Some(src),
|
||||
ancestor_lsn: start_lsn,
|
||||
};
|
||||
@@ -212,7 +210,7 @@ impl LayeredRepository {
|
||||
.conf
|
||||
.timeline_path(&timelineid, &self.tenantid)
|
||||
.join("wal");
|
||||
import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?;
|
||||
import_timeline_wal(&wal_dir, &timeline, timeline.last_record_lsn.load())?;
|
||||
|
||||
let timeline_rc = Arc::new(timeline);
|
||||
timelines.insert(timelineid, timeline_rc.clone());
|
||||
@@ -261,11 +259,9 @@ impl LayeredRepository {
|
||||
{
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
for (_timelineid, timeline) in timelines.iter() {
|
||||
let distance = u64::from(timeline.last_valid_lsn.load())
|
||||
- u64::from(timeline.last_checkpoint_lsn.load());
|
||||
if distance > CHECKPOINT_INTERVAL {
|
||||
timeline.checkpoint()?;
|
||||
}
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["checkpoint_timed"])
|
||||
.observe_closure_duration(|| timeline.checkpoint_internal(false))?
|
||||
}
|
||||
// release lock on 'timelines'
|
||||
}
|
||||
@@ -333,7 +329,7 @@ impl LayeredRepository {
|
||||
&self,
|
||||
target_timelineid: Option<ZTimelineId>,
|
||||
horizon: u64,
|
||||
_compact: bool,
|
||||
compact: bool,
|
||||
) -> Result<GcResult> {
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
@@ -397,6 +393,13 @@ impl LayeredRepository {
|
||||
let last_lsn = timeline.get_last_valid_lsn();
|
||||
|
||||
if let Some(cutoff) = last_lsn.checked_sub(horizon) {
|
||||
// If GC was explicitly requested by the admin, force flush all in-memory
|
||||
// layers to disk first, so that they too can be garbage collected. That's
|
||||
// used in tests, so we want as deterministic results as possible.
|
||||
if compact {
|
||||
timeline.checkpoint()?;
|
||||
}
|
||||
|
||||
let result = timeline.gc_timeline(branchpoints, cutoff)?;
|
||||
|
||||
totals += result;
|
||||
@@ -409,11 +412,24 @@ impl LayeredRepository {
|
||||
}
|
||||
|
||||
/// Metadata stored on disk for each timeline
|
||||
///
|
||||
/// The fields correspond to the values we hold in memory, in LayeredTimeline.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TimelineMetadata {
|
||||
last_valid_lsn: Lsn,
|
||||
last_record_lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
disk_consistent_lsn: Lsn,
|
||||
|
||||
// This is only set if we know it. We track it in memory when the page
|
||||
// server is running, but we only track the value corresponding to
|
||||
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
|
||||
// lot. We only store it in the metadata file when we flush *all* the
|
||||
// in-memory data so that 'last_record_lsn' is the same as
|
||||
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
|
||||
// soon as we reprocess at least one record, we will have a valid
|
||||
// 'prev_record_lsn' value in memory again. This is only really needed when
|
||||
// doing a clean shutdown, so that there is no more WAL beyond
|
||||
// 'disk_consistent_lsn'
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
|
||||
ancestor_timeline: Option<ZTimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
}
|
||||
@@ -456,7 +472,14 @@ pub struct LayeredTimeline {
|
||||
last_record_lsn: AtomicLsn,
|
||||
prev_record_lsn: AtomicLsn,
|
||||
|
||||
last_checkpoint_lsn: AtomicLsn,
|
||||
// All WAL records have been processed and stored durably on files on
|
||||
// local disk, up to this LSN. On crash and restart, we need to re-process
|
||||
// the WAL starting from this point.
|
||||
//
|
||||
// Some later WAL records might have been processed and also flushed to disk
|
||||
// already, so don't be surprised to see some, but there's no guarantee on
|
||||
// them yet.
|
||||
disk_consistent_lsn: AtomicLsn,
|
||||
|
||||
// Parent timeline that this timeline was branched from, and the LSN
|
||||
// of the branch point.
|
||||
@@ -774,8 +797,8 @@ impl Timeline for LayeredTimeline {
|
||||
/// metrics collection.
|
||||
fn checkpoint(&self) -> Result<()> {
|
||||
STORAGE_TIME
|
||||
.with_label_values(&["checkpoint"])
|
||||
.observe_closure_duration(|| self.checkpoint_internal())
|
||||
.with_label_values(&["checkpoint_force"])
|
||||
.observe_closure_duration(|| self.checkpoint_internal(true))
|
||||
}
|
||||
|
||||
/// Remember that WAL has been received and added to the page cache up to the given LSN
|
||||
@@ -864,10 +887,13 @@ impl LayeredTimeline {
|
||||
|
||||
walredo_mgr,
|
||||
|
||||
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
|
||||
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
|
||||
prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0),
|
||||
last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0),
|
||||
// initialize in-memory 'last_valid_lsn' and 'last_record_lsn' from
|
||||
// 'disk_consistent_lsn'.
|
||||
last_valid_lsn: SeqWait::new(metadata.disk_consistent_lsn),
|
||||
last_record_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0),
|
||||
|
||||
prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.unwrap_or(Lsn(0)).0),
|
||||
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
ancestor_lsn: metadata.ancestor_lsn,
|
||||
@@ -1003,23 +1029,23 @@ impl LayeredTimeline {
|
||||
let layer;
|
||||
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? {
|
||||
// Create new entry after the previous one.
|
||||
let lsn;
|
||||
let start_lsn;
|
||||
if prev_layer.get_timeline_id() != self.timelineid {
|
||||
// First modification on this timeline
|
||||
lsn = self.ancestor_lsn;
|
||||
start_lsn = self.ancestor_lsn;
|
||||
trace!(
|
||||
"creating file for write for {} at branch point {}/{}",
|
||||
seg,
|
||||
self.timelineid,
|
||||
lsn
|
||||
start_lsn
|
||||
);
|
||||
} else {
|
||||
lsn = prev_layer.get_end_lsn();
|
||||
start_lsn = prev_layer.get_end_lsn();
|
||||
trace!(
|
||||
"creating file for write for {} after previous layer {}/{}",
|
||||
seg,
|
||||
self.timelineid,
|
||||
lsn
|
||||
start_lsn
|
||||
);
|
||||
}
|
||||
trace!(
|
||||
@@ -1034,6 +1060,7 @@ impl LayeredTimeline {
|
||||
&*prev_layer,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
start_lsn,
|
||||
lsn,
|
||||
)?;
|
||||
} else {
|
||||
@@ -1045,7 +1072,8 @@ impl LayeredTimeline {
|
||||
lsn
|
||||
);
|
||||
|
||||
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?;
|
||||
layer =
|
||||
InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
@@ -1088,7 +1116,8 @@ impl LayeredTimeline {
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint_internal(&self) -> Result<()> {
|
||||
fn checkpoint_internal(&self, force: bool) -> Result<()> {
|
||||
// FIXME: these should be fetched atomically.
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
let last_record_lsn = self.last_record_lsn.load();
|
||||
let prev_record_lsn = self.prev_record_lsn.load();
|
||||
@@ -1105,71 +1134,80 @@ impl LayeredTimeline {
|
||||
// requests will block.
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
// Walk through each in-memory, and write any dirty data to disk,
|
||||
// as snapshot files.
|
||||
//
|
||||
// We currently write a new snapshot file for every relation
|
||||
// that was modified, if there has been any changes at all.
|
||||
// It would be smarter to only flush out in-memory layers that
|
||||
// have accumulated a fair amount of changes. Note that the
|
||||
// start and end LSNs of snapshot files belonging to different
|
||||
// relations don't have to line up, although currently they do
|
||||
// because of the way this works. So you could have a snapshot
|
||||
// file covering LSN range 100-200 for one relation, and a
|
||||
// snapshot file covering 150-250 for another relation. The
|
||||
// read functions should even cope with snapshot files
|
||||
// covering overlapping ranges for the same relation, although
|
||||
// that situation never arises currently.
|
||||
//
|
||||
// Note: We aggressively freeze and unload all the layer
|
||||
// structs. Even if a layer is actively being used. This
|
||||
// keeps memory usage in check, but is probably too
|
||||
// aggressive. Some kind of LRU policy would be appropriate.
|
||||
// Take the in-memory layer with the oldest WAL record. If it's older
|
||||
// than the threshold, write it out to disk as a new snapshot file.
|
||||
// Repeat until all remaining in-memory layers are within the threshold.
|
||||
//
|
||||
// That's necessary to limit the amount of WAL that needs to be kept
|
||||
// in the safekeepers, and that needs to be reprocessed on page server
|
||||
// crash. TODO: It's not a great policy for keeping memory usage in
|
||||
// check, though. We should also aim at flushing layers that consume
|
||||
// a lot of memory and/or aren't receiving much updates anymore.
|
||||
let mut disk_consistent_lsn = last_record_lsn;
|
||||
while let Some(oldest_layer) = layers.peek_oldest_open() {
|
||||
// Does this layer need freezing?
|
||||
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
|
||||
let distance = last_valid_lsn.0 - oldest_pending_lsn.0;
|
||||
if !force && distance < OLDEST_INMEM_DISTANCE {
|
||||
info!(
|
||||
"the oldest layer is now {} which is {} bytes behind last_valid_lsn",
|
||||
oldest_layer.get_seg_tag(),
|
||||
distance
|
||||
);
|
||||
disk_consistent_lsn = oldest_pending_lsn;
|
||||
break;
|
||||
}
|
||||
|
||||
// Call freeze() on any unfrozen layers (that is, layers that haven't
|
||||
// been written to disk yet).
|
||||
// Call unload() on all frozen layers, to release memory.
|
||||
let mut iter = layers.iter_open_layers();
|
||||
while let Some(layer) = iter.next() {
|
||||
let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?;
|
||||
// freeze it
|
||||
let (new_historic, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?;
|
||||
|
||||
// replace this layer with the new layers that 'freeze' returned
|
||||
// (removes it if new_open is None)
|
||||
iter.replace(new_open);
|
||||
|
||||
trace!(
|
||||
"freeze returned layer {} {}-{}",
|
||||
new_historic.get_seg_tag(),
|
||||
new_historic.get_start_lsn(),
|
||||
new_historic.get_end_lsn()
|
||||
);
|
||||
iter.insert_historic(new_historic);
|
||||
layers.pop_oldest_open();
|
||||
if let Some(n) = new_open {
|
||||
layers.insert_open(n);
|
||||
}
|
||||
layers.insert_historic(new_historic);
|
||||
}
|
||||
|
||||
// Call unload() on all frozen layers, to release memory.
|
||||
// TODO: On-disk layers shouldn't consume much memory to begin with,
|
||||
// so this shouldn't be necessary. But currently the SnapshotLayer
|
||||
// code slurps the whole file into memory, so they do in fact consume
|
||||
// a lot of memory.
|
||||
for layer in layers.iter_historic_layers() {
|
||||
layer.unload()?;
|
||||
}
|
||||
|
||||
// Also save the metadata, with updated last_valid_lsn and last_record_lsn, to a
|
||||
// file in the timeline dir. The metadata reflects the last_valid_lsn as it was
|
||||
// when we *started* the checkpoint, so that after crash, the WAL receiver knows
|
||||
// to restart the streaming from that WAL position.
|
||||
let ancestor_timelineid = if let Some(x) = &self.ancestor_timeline {
|
||||
Some(x.timelineid)
|
||||
} else {
|
||||
None
|
||||
// Save the metadata, with updated 'disk_consistent_lsn', to a
|
||||
// file in the timeline dir. After crash, we will restart WAL
|
||||
// streaming and processing from that point.
|
||||
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
// don't remember what the correct value that corresponds to some old
|
||||
// LSN is. But if we flush everything, then the value corresponding
|
||||
// current 'last_record_lsn' is correct and we can store it on disk.
|
||||
let ondisk_prev_record_lsn = {
|
||||
if disk_consistent_lsn == last_record_lsn {
|
||||
Some(prev_record_lsn)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid);
|
||||
|
||||
let metadata = TimelineMetadata {
|
||||
last_valid_lsn: last_valid_lsn,
|
||||
last_record_lsn: last_record_lsn,
|
||||
prev_record_lsn: prev_record_lsn,
|
||||
disk_consistent_lsn: disk_consistent_lsn,
|
||||
prev_record_lsn: ondisk_prev_record_lsn,
|
||||
ancestor_timeline: ancestor_timelineid,
|
||||
ancestor_lsn: self.ancestor_lsn,
|
||||
};
|
||||
LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?;
|
||||
|
||||
self.last_checkpoint_lsn.store(last_valid_lsn);
|
||||
// Also update the in-memory copy
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1200,12 +1238,6 @@ impl LayeredTimeline {
|
||||
let now = Instant::now();
|
||||
let mut result: GcResult = Default::default();
|
||||
|
||||
// Scan all snapshot files in the directory. For each file, if a newer file
|
||||
// exists, we can remove the old one.
|
||||
self.checkpoint()?;
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
|
||||
info!(
|
||||
"running GC on timeline {}, cutoff {}",
|
||||
self.timelineid, cutoff
|
||||
@@ -1213,9 +1245,13 @@ impl LayeredTimeline {
|
||||
|
||||
let mut layers_to_remove: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
|
||||
// Scan all snapshot files in the directory. For each file, if a newer file
|
||||
// exists, we can remove the old one.
|
||||
//
|
||||
// Determine for each file if it needs to be retained
|
||||
// FIXME: also scan open in-memory layers. Normally we cannot remove the
|
||||
// latest layer of any seg, but if it was unlinked it's possible
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
let seg = l.get_seg_tag();
|
||||
|
||||
|
||||
@@ -32,6 +32,8 @@ pub struct InMemoryLayer {
|
||||
///
|
||||
start_lsn: Lsn,
|
||||
|
||||
oldest_pending_lsn: Lsn,
|
||||
|
||||
/// The above fields never change. The parts that do change are in 'inner',
|
||||
/// and protected by mutex.
|
||||
inner: Mutex<InMemoryLayerInner>,
|
||||
@@ -164,6 +166,11 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
/// Return the oldest page version that's stored in this layer
|
||||
pub fn get_oldest_pending_lsn(&self) -> Lsn {
|
||||
self.oldest_pending_lsn
|
||||
}
|
||||
|
||||
///
|
||||
/// Create a new, empty, in-memory layer
|
||||
///
|
||||
@@ -173,6 +180,7 @@ impl InMemoryLayer {
|
||||
tenantid: ZTenantId,
|
||||
seg: SegmentTag,
|
||||
start_lsn: Lsn,
|
||||
oldest_pending_lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
|
||||
@@ -187,6 +195,7 @@ impl InMemoryLayer {
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
oldest_pending_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
@@ -302,13 +311,14 @@ impl InMemoryLayer {
|
||||
src: &dyn Layer,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
lsn: Lsn,
|
||||
start_lsn: Lsn,
|
||||
oldest_pending_lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
|
||||
src.get_seg_tag(),
|
||||
timelineid,
|
||||
lsn
|
||||
start_lsn
|
||||
);
|
||||
let mut page_versions = BTreeMap::new();
|
||||
let mut segsizes = BTreeMap::new();
|
||||
@@ -318,8 +328,8 @@ impl InMemoryLayer {
|
||||
let startblk;
|
||||
let size;
|
||||
if seg.rel.is_blocky() {
|
||||
size = src.get_seg_size(lsn)?;
|
||||
segsizes.insert(lsn, size);
|
||||
size = src.get_seg_size(start_lsn)?;
|
||||
segsizes.insert(start_lsn, size);
|
||||
startblk = seg.segno * RELISH_SEG_SIZE;
|
||||
} else {
|
||||
size = 1;
|
||||
@@ -327,12 +337,12 @@ impl InMemoryLayer {
|
||||
}
|
||||
|
||||
for blknum in startblk..(startblk + size) {
|
||||
let img = timeline.materialize_page(seg, blknum, lsn, src)?;
|
||||
let img = timeline.materialize_page(seg, blknum, start_lsn, src)?;
|
||||
let pv = PageVersion {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
};
|
||||
page_versions.insert((blknum, lsn), pv);
|
||||
page_versions.insert((blknum, start_lsn), pv);
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
@@ -340,7 +350,8 @@ impl InMemoryLayer {
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg: src.get_seg_tag(),
|
||||
start_lsn: lsn,
|
||||
start_lsn,
|
||||
oldest_pending_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: page_versions,
|
||||
@@ -434,10 +445,17 @@ impl InMemoryLayer {
|
||||
before_segsizes,
|
||||
)?;
|
||||
|
||||
trace!(
|
||||
"freeze: created snapshot layer {} {}-{}",
|
||||
snapfile.get_seg_tag(),
|
||||
snapfile.get_start_lsn(),
|
||||
snapfile.get_end_lsn()
|
||||
);
|
||||
|
||||
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
||||
// them
|
||||
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||
trace!("freeze: created new in-mem layer {} {}-", self.seg, end_lsn);
|
||||
|
||||
let new_open = Self::copy_snapshot(
|
||||
self.conf,
|
||||
@@ -446,6 +464,7 @@ impl InMemoryLayer {
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
end_lsn,
|
||||
end_lsn,
|
||||
)?;
|
||||
let mut new_inner = new_open.inner.lock().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
|
||||
@@ -14,31 +14,71 @@ use crate::layered_repository::{InMemoryLayer, SnapshotLayer};
|
||||
use crate::relish::*;
|
||||
use anyhow::Result;
|
||||
use log::*;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap};
|
||||
use std::ops::Bound::Included;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist or a timeline. The last layer that is
|
||||
/// open for writes is always an InMemoryLayer, and is tracked separately
|
||||
/// because there can be only one for each segment. The older layers,
|
||||
/// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN.
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
pub struct LayerMap {
|
||||
/// All the layers keyed by segment tag
|
||||
segs: HashMap<SegmentTag, SegEntry>,
|
||||
|
||||
/// All in-memory layers, ordered by 'oldest_pending_lsn' of each layer.
|
||||
/// This allows easy access to the in-memory layer that contains the
|
||||
/// oldest WAL record.
|
||||
open_segs: BinaryHeap<OpenSegEntry>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Per-segment entry in the LayerMap.segs hash map
|
||||
///
|
||||
/// The last layer that is open for writes is always an InMemoryLayer,
|
||||
/// and is kept in a separate field, because there can be only one for
|
||||
/// each segment. The older layers, stored on disk, are kept in a
|
||||
/// BTreeMap keyed by the layer's start LSN.
|
||||
struct SegEntry {
|
||||
pub open: Option<Arc<InMemoryLayer>>,
|
||||
pub historic: BTreeMap<Lsn, Arc<SnapshotLayer>>,
|
||||
}
|
||||
|
||||
/// Entry held LayerMap.open_segs, with boilerplate comparison
|
||||
/// routines to implement a min-heap ordered by 'oldest_pending_lsn'
|
||||
struct OpenSegEntry {
|
||||
pub oldest_pending_lsn: Lsn,
|
||||
pub layer: Arc<InMemoryLayer>,
|
||||
}
|
||||
impl Ord for OpenSegEntry {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that.
|
||||
other.oldest_pending_lsn.cmp(&self.oldest_pending_lsn)
|
||||
}
|
||||
}
|
||||
impl PartialOrd for OpenSegEntry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
|
||||
// to get that.
|
||||
other
|
||||
.oldest_pending_lsn
|
||||
.partial_cmp(&self.oldest_pending_lsn)
|
||||
}
|
||||
}
|
||||
impl PartialEq for OpenSegEntry {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.oldest_pending_lsn.eq(&other.oldest_pending_lsn)
|
||||
}
|
||||
}
|
||||
impl Eq for OpenSegEntry {}
|
||||
|
||||
impl LayerMap {
|
||||
///
|
||||
/// Look up using the given segment tag and LSN. This differs from a plain
|
||||
/// key-value lookup in that if there is any layer that covers the
|
||||
/// Look up a layer using the given segment tag and LSN. This differs from a
|
||||
/// plain key-value lookup in that if there is any layer that covers the
|
||||
/// given LSN, or precedes the given LSN, it is returned. In other words,
|
||||
/// you don't need to know the exact start LSN of the layer.
|
||||
///
|
||||
@@ -88,14 +128,29 @@ impl LayerMap {
|
||||
if let Some(_old) = &segentry.open {
|
||||
// FIXME: shouldn't exist, but check
|
||||
}
|
||||
segentry.open = Some(layer);
|
||||
segentry.open = Some(Arc::clone(&layer));
|
||||
} else {
|
||||
let segentry = SegEntry {
|
||||
open: Some(layer),
|
||||
open: Some(Arc::clone(&layer)),
|
||||
historic: BTreeMap::new(),
|
||||
};
|
||||
self.segs.insert(tag, segentry);
|
||||
}
|
||||
|
||||
let opensegentry = OpenSegEntry {
|
||||
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
|
||||
layer: layer,
|
||||
};
|
||||
self.open_segs.push(opensegentry);
|
||||
}
|
||||
|
||||
/// Remove the oldest in-memory layer
|
||||
pub fn pop_oldest_open(&mut self) {
|
||||
let opensegentry = self.open_segs.pop().unwrap();
|
||||
let segtag = opensegentry.layer.get_seg_tag();
|
||||
|
||||
let mut segentry = self.segs.get_mut(&segtag).unwrap();
|
||||
segentry.open = None;
|
||||
}
|
||||
|
||||
///
|
||||
@@ -196,10 +251,12 @@ impl LayerMap {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn iter_open_layers(&mut self) -> OpenLayerIter {
|
||||
OpenLayerIter {
|
||||
last: None,
|
||||
segiter: self.segs.iter_mut(),
|
||||
/// Return the oldest in-memory layer.
|
||||
pub fn peek_oldest_open(&self) -> Option<Arc<InMemoryLayer>> {
|
||||
if let Some(opensegentry) = self.open_segs.peek() {
|
||||
Some(Arc::clone(&opensegentry.layer))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,46 +272,11 @@ impl Default for LayerMap {
|
||||
fn default() -> Self {
|
||||
LayerMap {
|
||||
segs: HashMap::new(),
|
||||
open_segs: BinaryHeap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpenLayerIter<'a> {
|
||||
last: Option<&'a mut SegEntry>,
|
||||
|
||||
segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>,
|
||||
}
|
||||
|
||||
impl<'a> OpenLayerIter<'a> {
|
||||
pub fn replace(&mut self, replacement: Option<Arc<InMemoryLayer>>) {
|
||||
let segentry = self.last.as_mut().unwrap();
|
||||
segentry.open = replacement;
|
||||
}
|
||||
|
||||
pub fn insert_historic(&mut self, new_layer: Arc<SnapshotLayer>) {
|
||||
let start_lsn = new_layer.get_start_lsn();
|
||||
|
||||
let segentry = self.last.as_mut().unwrap();
|
||||
segentry.historic.insert(start_lsn, new_layer);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OpenLayerIter<'a> {
|
||||
type Item = Arc<InMemoryLayer>;
|
||||
|
||||
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
|
||||
while let Some((_seg, entry)) = self.segiter.next() {
|
||||
if let Some(open) = &entry.open {
|
||||
let op = Arc::clone(&open);
|
||||
self.last = Some(entry);
|
||||
return Some(op);
|
||||
}
|
||||
}
|
||||
self.last = None;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HistoricLayerIter<'a> {
|
||||
segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
|
||||
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<SnapshotLayer>>>,
|
||||
|
||||
Reference in New Issue
Block a user