Fix relish extention after it was dropped or truncated.

- Turn dropped layers into non-writeable in get_layer_for_write().

- Handle non-writeable dropped layers in checkpointer. They don't need freezing, so just remove them from list of open_segs and write out to disk.

- Remove code that handles dropped layers in freeze() function. It is not used anymore.
This commit is contained in:
anastasia
2021-09-16 17:46:24 +03:00
committed by lubennikovaav
parent 86164c8b33
commit 7fb7f67bb4
4 changed files with 116 additions and 69 deletions

View File

@@ -1195,20 +1195,42 @@ impl LayeredTimeline {
}
// Do we have a layer open for writing already?
if let Some(layer) = layers.get_open(&seg) {
if layer.get_start_lsn() > lsn {
let layer;
if let Some(open_layer) = layers.get_open(&seg) {
if open_layer.get_start_lsn() > lsn {
bail!("unexpected open layer in the future");
}
return Ok(layer);
}
// No (writeable) layer for this relation yet. Create one.
// Open layer exists, but it is dropped, so create a new one.
if open_layer.is_dropped() {
assert!(!open_layer.is_writeable());
// Layer that is created after dropped one represents a new relish segment.
trace!(
"creating layer for write for new relish segment after dropped layer {} at {}/{}",
seg,
self.timelineid,
lsn
);
layer = InMemoryLayer::create(
self.conf,
self.timelineid,
self.tenantid,
seg,
lsn,
lsn,
)?;
} else {
return Ok(open_layer);
}
}
// No writeable layer for this relation. Create one.
//
// Is this a completely new relation? Or the first modification after branching?
//
let layer;
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read_locked(seg, lsn, &layers)? {
else if let Some((prev_layer, _prev_lsn)) =
self.get_layer_for_read_locked(seg, lsn, &layers)?
{
// Create new entry after the previous one.
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
@@ -1345,6 +1367,8 @@ impl LayeredTimeline {
layers.insert_open(new_open);
}
// We temporarily insert InMemory layer into historic list here.
// TODO: check that all possible concurrent users of 'historic' treat it right
layers.insert_historic(frozen.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
@@ -1360,6 +1384,7 @@ impl LayeredTimeline {
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.as_ref());
//FIXME This needs a comment.
if let Some(last_historic) = new_historics.last() {
if let Some(new_open) = &maybe_new_open {
let maybe_old_predecessor =

View File

@@ -485,8 +485,9 @@ impl InMemoryLayer {
assert!(inner.drop_lsn.is_none());
inner.drop_lsn = Some(lsn);
inner.writeable = false;
info!("dropped segment {} at {}", self.seg, lsn);
trace!("dropped segment {} at {}", self.seg, lsn);
Ok(())
}
@@ -537,11 +538,16 @@ impl InMemoryLayer {
})
}
pub fn is_writeable(&self) -> bool {
let inner = self.inner.lock().unwrap();
inner.writeable
}
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
/// All data up to and including `cutoff_lsn` (or the drop LSN, if dropped)
/// All data up to and including `cutoff_lsn`
/// is copied to `frozen`, while the remaining data is copied to `open`.
/// After completion, self is non-writeable, but not frozen.
pub fn freeze(&self, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
info!(
"freezing in memory layer for {} on timeline {} at {}",
self.seg, self.timelineid, cutoff_lsn
@@ -549,53 +555,50 @@ impl InMemoryLayer {
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
let self_ref = self.clone();
let mut inner = self_ref.inner.lock().unwrap();
// Dropped layers don't need any special freeze actions,
// they are marked as non-writeable at drop and just
// written out to disk by checkpointer.
if inner.drop_lsn.is_some() {
assert!(!inner.writeable);
info!(
"freezing in memory layer for {} on timeline {} is dropped at {}",
self.seg,
self.timelineid,
inner.drop_lsn.unwrap()
);
// There should be no newer layer that refers this non-writeable layer,
// because layer that is created after dropped one represents a new rel.
return Ok(FreezeLayers {
frozen: self,
open: None,
});
}
assert!(inner.writeable);
inner.writeable = false;
// Normally, use the cutoff LSN as the end of the frozen layer.
// But if the relation was dropped, we know that there are no
// more changes coming in for it, and in particular we know that
// there are no changes "in flight" for the LSN anymore, so we use
// the drop LSN instead. The drop-LSN could be ahead of the
// caller-specified LSN!
let dropped = inner.drop_lsn.is_some();
let end_lsn = if dropped {
inner.drop_lsn.unwrap()
} else {
cutoff_lsn
};
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
let mut before_page_versions;
let mut before_segsizes;
let mut after_page_versions;
let mut after_segsizes;
if !dropped {
before_segsizes = BTreeMap::new();
after_segsizes = BTreeMap::new();
for (lsn, size) in inner.segsizes.iter() {
if *lsn > end_lsn {
after_segsizes.insert(*lsn, *size);
} else {
before_segsizes.insert(*lsn, *size);
}
// Divide all the page versions into old and new
// at the 'cutoff_lsn' point.
let mut before_segsizes = BTreeMap::new();
let mut after_segsizes = BTreeMap::new();
for (lsn, size) in inner.segsizes.iter() {
if *lsn > cutoff_lsn {
after_segsizes.insert(*lsn, *size);
} else {
before_segsizes.insert(*lsn, *size);
}
}
before_page_versions = BTreeMap::new();
after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > end_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
let mut before_page_versions = BTreeMap::new();
let mut after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > cutoff_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
} else {
before_page_versions = inner.page_versions.clone();
before_segsizes = inner.segsizes.clone();
after_segsizes = BTreeMap::new();
after_page_versions = BTreeMap::new();
}
let frozen = Arc::new(InMemoryLayer {
@@ -604,7 +607,7 @@ impl InMemoryLayer {
timelineid: self.timelineid,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: Some(end_lsn),
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
@@ -615,14 +618,14 @@ impl InMemoryLayer {
}),
});
let open = if !dropped && (!after_segsizes.is_empty() || !after_page_versions.is_empty()) {
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let mut new_open = Self::create_successor_layer(
self.conf,
frozen.clone(),
self.timelineid,
self.tenantid,
end_lsn,
end_lsn,
cutoff_lsn,
cutoff_lsn,
)?;
let new_inner = new_open.inner.get_mut().unwrap();
@@ -634,7 +637,6 @@ impl InMemoryLayer {
None
};
// TODO could we avoid creating the `frozen` if it contains no data
Ok(FreezeLayers { frozen, open })
}
@@ -647,11 +649,23 @@ impl InMemoryLayer {
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
let end_lsn = self.end_lsn.expect("can only write frozen layers to disk");
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
self.filename().display(),
self.end_lsn.unwrap_or(Lsn(0)),
self.get_end_lsn()
);
let inner = self.inner.lock().unwrap();
let drop_lsn = inner.drop_lsn;
assert!(!inner.writeable);
let end_lsn = match drop_lsn {
Some(dlsn) => dlsn,
None => self.end_lsn.unwrap(),
};
let predecessor = inner.predecessor.clone();
let mut before_page_versions;

View File

@@ -76,7 +76,7 @@ impl LayerMap {
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.insert_open(Arc::clone(&layer));
segentry.update_open(Arc::clone(&layer));
// Also add it to the binary heap
let open_layer_entry = OpenLayerEntry {
@@ -97,11 +97,14 @@ impl LayerMap {
// Also remove it from the SegEntry of this segment
let mut segentry = self.segs.get_mut(&segtag).unwrap();
assert!(Arc::ptr_eq(
segentry.open.as_ref().unwrap(),
&oldest_entry.layer
));
segentry.open = None;
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) {
segentry.open = None;
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
assert!(!oldest_entry.layer.is_writeable());
assert!(oldest_entry.layer.is_dropped());
}
NUM_INMEMORY_LAYERS.dec();
}
@@ -291,8 +294,13 @@ impl SegEntry {
false
}
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
assert!(self.open.is_none());
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) {
if let Some(prev_open) = &self.open {
assert!(!prev_open.is_writeable());
}
self.open = Some(layer);
}

View File

@@ -403,7 +403,7 @@ mod tests {
// Create timeline to work on
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
let tline = repo.create_empty_timeline(timelineid)?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
tline.advance_last_record_lsn(Lsn(0x20));
@@ -440,7 +440,7 @@ mod tests {
// Create timeline to work on
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
let tline = repo.create_empty_timeline(timelineid)?;
//from storage_layer.rs
const RELISH_SEG_SIZE: u32 = 10 * 1024 * 1024 / 8192;