idea: use downgrade() to ensure forward progress

This commit is contained in:
Christian Schwarz
2023-10-02 17:39:35 +00:00
parent aba5d1b31f
commit b147bde907
3 changed files with 104 additions and 103 deletions

View File

@@ -291,18 +291,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
state: PageWriteGuardState<'i>,
}
_permit: PinnedSlotsPermit,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -310,25 +315,37 @@ impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => todo!(),
}
}
}
impl PageWriteGuard<'_> {
impl<'a> PageWriteGuard<'a> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -339,11 +356,13 @@ impl Drop for PageWriteGuard<'_> {
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
}
}
}
@@ -356,7 +375,7 @@ pub enum ReadBufResult<'a> {
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
@@ -447,15 +466,15 @@ impl PageCache {
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
WriteBufResult::Found(read_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
assert!(*read_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
}
}
@@ -638,41 +657,31 @@ impl PageCache {
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
// FIXME: the name is wrong.
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
) -> Option<PageReadGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Some(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: true,
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
}
}
@@ -728,9 +737,10 @@ impl PageCache {
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}

View File

@@ -186,27 +186,22 @@ impl FileBlockReader {
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => return Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
return Ok(write_guard.mark_valid().into());
}
};
}
}

View File

@@ -70,38 +70,34 @@ impl EphemeralFile {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -171,7 +167,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {