mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 16:12:56 +00:00
Fix relish extention after it was dropped or truncated.
- Turn dropped layers into non-writeable in get_layer_for_write(). - Handle non-writeable dropped layers in checkpointer. They don't need freezing, so just remove them from list of open_segs and write out to disk. - Remove code that handles dropped layers in freeze() function. It is not used anymore.
This commit is contained in:
@@ -1195,20 +1195,42 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
if let Some(layer) = layers.get_open(&seg) {
|
||||
if layer.get_start_lsn() > lsn {
|
||||
let layer;
|
||||
if let Some(open_layer) = layers.get_open(&seg) {
|
||||
if open_layer.get_start_lsn() > lsn {
|
||||
bail!("unexpected open layer in the future");
|
||||
}
|
||||
return Ok(layer);
|
||||
}
|
||||
|
||||
// No (writeable) layer for this relation yet. Create one.
|
||||
// Open layer exists, but it is dropped, so create a new one.
|
||||
if open_layer.is_dropped() {
|
||||
assert!(!open_layer.is_writeable());
|
||||
// Layer that is created after dropped one represents a new relish segment.
|
||||
trace!(
|
||||
"creating layer for write for new relish segment after dropped layer {} at {}/{}",
|
||||
seg,
|
||||
self.timelineid,
|
||||
lsn
|
||||
);
|
||||
|
||||
layer = InMemoryLayer::create(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
seg,
|
||||
lsn,
|
||||
lsn,
|
||||
)?;
|
||||
} else {
|
||||
return Ok(open_layer);
|
||||
}
|
||||
}
|
||||
// No writeable layer for this relation. Create one.
|
||||
//
|
||||
// Is this a completely new relation? Or the first modification after branching?
|
||||
//
|
||||
|
||||
let layer;
|
||||
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read_locked(seg, lsn, &layers)? {
|
||||
else if let Some((prev_layer, _prev_lsn)) =
|
||||
self.get_layer_for_read_locked(seg, lsn, &layers)?
|
||||
{
|
||||
// Create new entry after the previous one.
|
||||
let start_lsn;
|
||||
if prev_layer.get_timeline_id() != self.timelineid {
|
||||
@@ -1345,6 +1367,8 @@ impl LayeredTimeline {
|
||||
layers.insert_open(new_open);
|
||||
}
|
||||
|
||||
// We temporarily insert InMemory layer into historic list here.
|
||||
// TODO: check that all possible concurrent users of 'historic' treat it right
|
||||
layers.insert_historic(frozen.clone());
|
||||
|
||||
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
|
||||
@@ -1360,6 +1384,7 @@ impl LayeredTimeline {
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
||||
layers.remove_historic(frozen.as_ref());
|
||||
|
||||
//FIXME This needs a comment.
|
||||
if let Some(last_historic) = new_historics.last() {
|
||||
if let Some(new_open) = &maybe_new_open {
|
||||
let maybe_old_predecessor =
|
||||
|
||||
@@ -485,8 +485,9 @@ impl InMemoryLayer {
|
||||
|
||||
assert!(inner.drop_lsn.is_none());
|
||||
inner.drop_lsn = Some(lsn);
|
||||
inner.writeable = false;
|
||||
|
||||
info!("dropped segment {} at {}", self.seg, lsn);
|
||||
trace!("dropped segment {} at {}", self.seg, lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -537,11 +538,16 @@ impl InMemoryLayer {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_writeable(&self) -> bool {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner.writeable
|
||||
}
|
||||
|
||||
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
|
||||
/// All data up to and including `cutoff_lsn` (or the drop LSN, if dropped)
|
||||
/// All data up to and including `cutoff_lsn`
|
||||
/// is copied to `frozen`, while the remaining data is copied to `open`.
|
||||
/// After completion, self is non-writeable, but not frozen.
|
||||
pub fn freeze(&self, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
|
||||
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
|
||||
info!(
|
||||
"freezing in memory layer for {} on timeline {} at {}",
|
||||
self.seg, self.timelineid, cutoff_lsn
|
||||
@@ -549,53 +555,50 @@ impl InMemoryLayer {
|
||||
|
||||
self.assert_not_frozen();
|
||||
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let self_ref = self.clone();
|
||||
let mut inner = self_ref.inner.lock().unwrap();
|
||||
// Dropped layers don't need any special freeze actions,
|
||||
// they are marked as non-writeable at drop and just
|
||||
// written out to disk by checkpointer.
|
||||
if inner.drop_lsn.is_some() {
|
||||
assert!(!inner.writeable);
|
||||
info!(
|
||||
"freezing in memory layer for {} on timeline {} is dropped at {}",
|
||||
self.seg,
|
||||
self.timelineid,
|
||||
inner.drop_lsn.unwrap()
|
||||
);
|
||||
|
||||
// There should be no newer layer that refers this non-writeable layer,
|
||||
// because layer that is created after dropped one represents a new rel.
|
||||
return Ok(FreezeLayers {
|
||||
frozen: self,
|
||||
open: None,
|
||||
});
|
||||
}
|
||||
assert!(inner.writeable);
|
||||
inner.writeable = false;
|
||||
|
||||
// Normally, use the cutoff LSN as the end of the frozen layer.
|
||||
// But if the relation was dropped, we know that there are no
|
||||
// more changes coming in for it, and in particular we know that
|
||||
// there are no changes "in flight" for the LSN anymore, so we use
|
||||
// the drop LSN instead. The drop-LSN could be ahead of the
|
||||
// caller-specified LSN!
|
||||
let dropped = inner.drop_lsn.is_some();
|
||||
let end_lsn = if dropped {
|
||||
inner.drop_lsn.unwrap()
|
||||
} else {
|
||||
cutoff_lsn
|
||||
};
|
||||
|
||||
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
|
||||
let mut before_page_versions;
|
||||
let mut before_segsizes;
|
||||
let mut after_page_versions;
|
||||
let mut after_segsizes;
|
||||
if !dropped {
|
||||
before_segsizes = BTreeMap::new();
|
||||
after_segsizes = BTreeMap::new();
|
||||
for (lsn, size) in inner.segsizes.iter() {
|
||||
if *lsn > end_lsn {
|
||||
after_segsizes.insert(*lsn, *size);
|
||||
} else {
|
||||
before_segsizes.insert(*lsn, *size);
|
||||
}
|
||||
// Divide all the page versions into old and new
|
||||
// at the 'cutoff_lsn' point.
|
||||
let mut before_segsizes = BTreeMap::new();
|
||||
let mut after_segsizes = BTreeMap::new();
|
||||
for (lsn, size) in inner.segsizes.iter() {
|
||||
if *lsn > cutoff_lsn {
|
||||
after_segsizes.insert(*lsn, *size);
|
||||
} else {
|
||||
before_segsizes.insert(*lsn, *size);
|
||||
}
|
||||
}
|
||||
|
||||
before_page_versions = BTreeMap::new();
|
||||
after_page_versions = BTreeMap::new();
|
||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
||||
if *lsn > end_lsn {
|
||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
} else {
|
||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
}
|
||||
let mut before_page_versions = BTreeMap::new();
|
||||
let mut after_page_versions = BTreeMap::new();
|
||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
||||
if *lsn > cutoff_lsn {
|
||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
} else {
|
||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
}
|
||||
} else {
|
||||
before_page_versions = inner.page_versions.clone();
|
||||
before_segsizes = inner.segsizes.clone();
|
||||
after_segsizes = BTreeMap::new();
|
||||
after_page_versions = BTreeMap::new();
|
||||
}
|
||||
|
||||
let frozen = Arc::new(InMemoryLayer {
|
||||
@@ -604,7 +607,7 @@ impl InMemoryLayer {
|
||||
timelineid: self.timelineid,
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: Some(end_lsn),
|
||||
end_lsn: Some(cutoff_lsn),
|
||||
oldest_pending_lsn: self.start_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: inner.drop_lsn,
|
||||
@@ -615,14 +618,14 @@ impl InMemoryLayer {
|
||||
}),
|
||||
});
|
||||
|
||||
let open = if !dropped && (!after_segsizes.is_empty() || !after_page_versions.is_empty()) {
|
||||
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
let mut new_open = Self::create_successor_layer(
|
||||
self.conf,
|
||||
frozen.clone(),
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
end_lsn,
|
||||
end_lsn,
|
||||
cutoff_lsn,
|
||||
cutoff_lsn,
|
||||
)?;
|
||||
|
||||
let new_inner = new_open.inner.get_mut().unwrap();
|
||||
@@ -634,7 +637,6 @@ impl InMemoryLayer {
|
||||
None
|
||||
};
|
||||
|
||||
// TODO could we avoid creating the `frozen` if it contains no data
|
||||
Ok(FreezeLayers { frozen, open })
|
||||
}
|
||||
|
||||
@@ -647,11 +649,23 @@ impl InMemoryLayer {
|
||||
/// when a new relish is created with a single LSN, so that the start and
|
||||
/// end LSN are the same.)
|
||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
let end_lsn = self.end_lsn.expect("can only write frozen layers to disk");
|
||||
trace!(
|
||||
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
|
||||
self.filename().display(),
|
||||
self.end_lsn.unwrap_or(Lsn(0)),
|
||||
self.get_end_lsn()
|
||||
);
|
||||
|
||||
let inner = self.inner.lock().unwrap();
|
||||
|
||||
let drop_lsn = inner.drop_lsn;
|
||||
|
||||
assert!(!inner.writeable);
|
||||
|
||||
let end_lsn = match drop_lsn {
|
||||
Some(dlsn) => dlsn,
|
||||
None => self.end_lsn.unwrap(),
|
||||
};
|
||||
|
||||
let predecessor = inner.predecessor.clone();
|
||||
|
||||
let mut before_page_versions;
|
||||
|
||||
@@ -76,7 +76,7 @@ impl LayerMap {
|
||||
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
|
||||
|
||||
segentry.insert_open(Arc::clone(&layer));
|
||||
segentry.update_open(Arc::clone(&layer));
|
||||
|
||||
// Also add it to the binary heap
|
||||
let open_layer_entry = OpenLayerEntry {
|
||||
@@ -97,11 +97,14 @@ impl LayerMap {
|
||||
|
||||
// Also remove it from the SegEntry of this segment
|
||||
let mut segentry = self.segs.get_mut(&segtag).unwrap();
|
||||
assert!(Arc::ptr_eq(
|
||||
segentry.open.as_ref().unwrap(),
|
||||
&oldest_entry.layer
|
||||
));
|
||||
segentry.open = None;
|
||||
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) {
|
||||
segentry.open = None;
|
||||
} else {
|
||||
// We could have already updated segentry.open for
|
||||
// dropped (non-writeable) layer. This is fine.
|
||||
assert!(!oldest_entry.layer.is_writeable());
|
||||
assert!(oldest_entry.layer.is_dropped());
|
||||
}
|
||||
|
||||
NUM_INMEMORY_LAYERS.dec();
|
||||
}
|
||||
@@ -291,8 +294,13 @@ impl SegEntry {
|
||||
false
|
||||
}
|
||||
|
||||
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
assert!(self.open.is_none());
|
||||
// Set new open layer for a SegEntry.
|
||||
// It's ok to rewrite previous open layer,
|
||||
// but only if it is not writeable anymore.
|
||||
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
if let Some(prev_open) = &self.open {
|
||||
assert!(!prev_open.is_writeable());
|
||||
}
|
||||
self.open = Some(layer);
|
||||
}
|
||||
|
||||
|
||||
@@ -403,7 +403,7 @@ mod tests {
|
||||
|
||||
// Create timeline to work on
|
||||
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
|
||||
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
|
||||
let tline = repo.create_empty_timeline(timelineid)?;
|
||||
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.advance_last_record_lsn(Lsn(0x20));
|
||||
@@ -440,7 +440,7 @@ mod tests {
|
||||
|
||||
// Create timeline to work on
|
||||
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
|
||||
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
|
||||
let tline = repo.create_empty_timeline(timelineid)?;
|
||||
|
||||
//from storage_layer.rs
|
||||
const RELISH_SEG_SIZE: u32 = 10 * 1024 * 1024 / 8192;
|
||||
|
||||
Reference in New Issue
Block a user