Make page cache and read_blk async (#5023)

## Problem

`read_blk` does I/O and thus we would like to make it async. We can't
make the function async as long as the `PageReadGuard` returned by
`read_blk` isn't `Send`. The page cache is called by `read_blk`, and
thus it can't be async without `read_blk` being async. Thus, we have a
circular dependency.

## Summary of changes

Due to the circular dependency, we convert both the page cache and
`read_blk` to async at the same time:

We make the page cache use `tokio::sync` synchronization primitives as
those are `Send`. This makes all the places that acquire a lock require
async though, which we then also do. This includes also asyncification
of the `read_blk` function.

Builds upon #4994, #5015, #5056, and #5129.

Part of #4743.
This commit is contained in:
Arpad Müller
2023-08-30 09:04:31 +02:00
committed by GitHub
parent 81b6578c44
commit eb0a698adc
10 changed files with 78 additions and 68 deletions

View File

@@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,

View File

@@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,

View File

@@ -75,10 +75,7 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
};
use anyhow::Context;
@@ -162,7 +159,7 @@ struct Version {
}
struct Slot {
inner: RwLock<SlotInner>,
inner: tokio::sync::RwLock<SlotInner>,
usage_count: AtomicU8,
}
@@ -220,9 +217,9 @@ pub struct PageCache {
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -238,7 +235,7 @@ pub struct PageCache {
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>);
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
@@ -265,7 +262,7 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: RwLockWriteGuard<'i, SlotInner>,
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
@@ -343,7 +340,7 @@ impl PageCache {
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub fn lookup_materialized_page(
pub async fn lookup_materialized_page(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -363,7 +360,7 @@ impl PageCache {
lsn,
};
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
@@ -390,7 +387,7 @@ impl PageCache {
///
/// Store an image of the given page in the cache.
///
pub fn memorize_materialized_page(
pub async fn memorize_materialized_page(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -407,7 +404,7 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key)? {
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
@@ -425,10 +422,14 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
pub async fn read_immutable_buf(
&self,
file_id: FileId,
blkno: u32,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
self.lock_for_read(&mut cache_key).await
}
//
@@ -448,14 +449,14 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.read().unwrap();
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard(inner));
@@ -496,7 +497,7 @@ impl PageCache {
/// }
/// ```
///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
@@ -511,7 +512,7 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if is_first_iteration {
hit.inc();
}
@@ -554,13 +555,13 @@ impl PageCache {
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
@@ -573,10 +574,10 @@ impl PageCache {
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
return Ok(WriteBufResult::Found(write_guard));
}
@@ -757,7 +758,7 @@ impl PageCache {
/// Find a slot to evict.
///
/// On return, the slot is empty and write-locked.
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let mut iters = 0;
loop {
@@ -769,10 +770,7 @@ impl PageCache {
if slot.dec_usage_count() == 0 {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(TryLockError::Poisoned(err)) => {
anyhow::bail!("buffer lock was poisoned: {err:?}")
}
Err(TryLockError::WouldBlock) => {
Err(_err) => {
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in
@@ -816,7 +814,7 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: RwLock::new(SlotInner { key: None, buf }),
inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})

View File

@@ -33,7 +33,7 @@ impl<'a> BlockCursor<'a> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
let mut buf = self.read_blk(blknum)?;
let mut buf = self.read_blk(blknum).await?;
// peek at the first byte, to determine if it's a 1- or 4-byte length
let first_len_byte = buf[off];
@@ -49,7 +49,7 @@ impl<'a> BlockCursor<'a> {
// it is split across two pages
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
blknum += 1;
buf = self.read_blk(blknum)?;
buf = self.read_blk(blknum).await?;
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
off = 4 - thislen;
} else {
@@ -70,7 +70,7 @@ impl<'a> BlockCursor<'a> {
if page_remain == 0 {
// continue on next page
blknum += 1;
buf = self.read_blk(blknum)?;
buf = self.read_blk(blknum).await?;
off = 0;
page_remain = PAGE_SZ;
}

View File

@@ -39,7 +39,7 @@ pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
@@ -49,9 +49,9 @@ impl From<PageReadGuard<'static>> for BlockLease<'static> {
}
#[cfg(test)]
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Rc(value)
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Arc(value)
}
}
@@ -63,7 +63,7 @@ impl<'a> Deref for BlockLease<'a> {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Rc(v) => v.deref(),
BlockLease::Arc(v) => v.deref(),
}
}
}
@@ -83,13 +83,13 @@ pub(crate) enum BlockReaderRef<'a> {
impl<'a> BlockReaderRef<'a> {
#[inline(always)]
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReaderVirtual(r) => r.read_blk(blknum),
FileBlockReaderFile(r) => r.read_blk(blknum),
EphemeralFile(r) => r.read_blk(blknum),
Adapter(r) => r.read_blk(blknum),
FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
FileBlockReaderFile(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
}
@@ -134,8 +134,8 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum)
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum).await
}
}
@@ -170,11 +170,12 @@ where
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,

View File

@@ -262,7 +262,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?;
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
@@ -357,7 +357,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
let blk = block_cursor.read_blk(self.start_blk + blknum)?;
let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
let buf: &[u8] = blk.as_ref();
let node = OnDiskNode::<L>::deparse(buf)?;
@@ -704,7 +704,7 @@ pub(crate) mod tests {
pub(crate) fn read_blk(&self, blknum: u32) -> io::Result<BlockLease> {
let mut buf = [0u8; PAGE_SZ];
buf.copy_from_slice(&self.blocks[blknum as usize]);
Ok(std::rc::Rc::new(buf).into())
Ok(std::sync::Arc::new(buf).into())
}
}
impl BlockReader for TestDisk {

View File

@@ -61,13 +61,14 @@ impl EphemeralFile {
self.len
}
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -135,10 +136,13 @@ impl EphemeralFile {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
let cache = page_cache::get();
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
) {
match cache
.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
)
.await
{
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);

View File

@@ -467,7 +467,7 @@ impl DeltaLayer {
PathOrConf::Path(_) => None,
};
let loaded = DeltaLayerInner::load(&path, summary)?;
let loaded = DeltaLayerInner::load(&path, summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -841,12 +841,15 @@ impl Drop for DeltaLayerWriter {
}
impl DeltaLayerInner {
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
pub(super) async fn load(
path: &std::path::Path,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
@@ -1028,7 +1031,7 @@ impl<'a> ValueRef<'a> {
pub(crate) struct Adapter<T>(T);
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum)
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum).await
}
}

View File

@@ -349,7 +349,8 @@ impl ImageLayer {
PathOrConf::Path(_) => None,
};
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -432,7 +433,7 @@ impl ImageLayer {
}
impl ImageLayerInner {
pub(super) fn load(
pub(super) async fn load(
path: &std::path::Path,
lsn: Lsn,
summary: Option<Summary>,
@@ -440,7 +441,7 @@ impl ImageLayerInner {
let file = VirtualFile::open(path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {

View File

@@ -465,7 +465,7 @@ impl Timeline {
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn) {
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
@@ -494,6 +494,7 @@ impl Timeline {
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
.await
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
@@ -2443,13 +2444,14 @@ impl Timeline {
}
}
fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) =
cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?;
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
@@ -4131,7 +4133,7 @@ impl Timeline {
///
/// Reconstruct a value, using the given base image and WAL records in 'data'.
///
fn reconstruct_value(
async fn reconstruct_value(
&self,
key: Key,
request_lsn: Lsn,
@@ -4200,6 +4202,7 @@ impl Timeline {
last_rec_lsn,
&img,
)
.await
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));