Switch to RwLock in InMemoryLayer

Allows more parallelism basically for free.
This commit is contained in:
Heikki Linnakangas
2021-09-27 19:15:40 +03:00
parent 22e15844ae
commit 2252d9faa8

View File

@@ -19,7 +19,7 @@ use std::cmp::min;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn;
@@ -44,7 +44,7 @@ pub struct InMemoryLayer {
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: Mutex<InMemoryLayerInner>,
inner: RwLock<InMemoryLayerInner>,
}
pub struct InMemoryLayerInner {
@@ -95,7 +95,7 @@ impl Layer for InMemoryLayer {
// An in-memory layer doesn't really have a filename as it's not stored on disk,
// but we construct a filename as if it was a delta layer
fn filename(&self) -> PathBuf {
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
let end_lsn;
let dropped;
@@ -139,7 +139,7 @@ impl Layer for InMemoryLayer {
return Lsn(end_lsn.0 + 1);
}
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
drop_lsn
@@ -149,7 +149,7 @@ impl Layer for InMemoryLayer {
}
fn is_dropped(&self) -> bool {
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
inner.drop_lsn.is_some()
}
@@ -167,7 +167,7 @@ impl Layer for InMemoryLayer {
let predecessor: Option<Arc<dyn Layer>>;
{
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
let minkey = (blknum, Lsn(0));
@@ -214,13 +214,13 @@ impl Layer for InMemoryLayer {
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
Ok(inner.get_seg_size(lsn))
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool> {
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
// If the segment created after requested LSN,
// it doesn't exist in the layer. But we shouldn't
@@ -252,13 +252,13 @@ impl Layer for InMemoryLayer {
}
fn is_incremental(&self) -> bool {
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
inner.predecessor.is_some()
}
/// debugging function to print out the contents of the layer
fn dump(&self) -> Result<()> {
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
let end_str = inner
.drop_lsn
@@ -340,7 +340,7 @@ impl InMemoryLayer {
start_lsn,
end_lsn: None,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
@@ -389,7 +389,7 @@ impl InMemoryLayer {
self.timelineid,
lsn
);
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
@@ -407,7 +407,7 @@ impl InMemoryLayer {
if self.seg.rel.is_blocky() {
let newsize = blknum - self.seg.segno * RELISH_SEG_SIZE + 1;
// use inner get_seg_size, since calling self.get_seg_size will try to acquire self.inner.lock
// use inner get_seg_size, since calling self.get_seg_size will try to acquire the lock,
// which we've just acquired above
let oldsize = inner.get_seg_size(lsn);
if newsize > oldsize {
@@ -460,7 +460,7 @@ impl InMemoryLayer {
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
// check that this we truncate to a smaller size than segment was before the truncation
@@ -481,7 +481,7 @@ impl InMemoryLayer {
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
@@ -533,7 +533,7 @@ impl InMemoryLayer {
start_lsn,
end_lsn: None,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes,
@@ -544,7 +544,7 @@ impl InMemoryLayer {
}
pub fn is_writeable(&self) -> bool {
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
inner.writeable
}
@@ -564,7 +564,7 @@ impl InMemoryLayer {
self.assert_not_frozen();
let self_ref = self.clone();
let mut inner = self_ref.inner.lock().unwrap();
let mut inner = self_ref.inner.write().unwrap();
// Dropped layers don't need any special freeze actions,
// they are marked as non-writeable at drop and just
// written out to disk by checkpointer.
@@ -620,7 +620,7 @@ impl InMemoryLayer {
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
inner: Mutex::new(InMemoryLayerInner {
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
@@ -667,7 +667,8 @@ impl InMemoryLayer {
self.get_end_lsn()
);
let inner = self.inner.lock().unwrap();
let inner = self.inner.read().unwrap();
let drop_lsn = inner.drop_lsn;
assert!(!inner.writeable);
@@ -740,7 +741,7 @@ impl InMemoryLayer {
}
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
let mut inner = self.inner.lock().unwrap();
let mut inner = self.inner.write().unwrap();
inner.predecessor.replace(predecessor)
}
}