PoC using spawn_blocking

This commit is contained in:
Christian Schwarz
2023-08-29 09:59:42 +00:00
parent bb88e5bf57
commit a5b6e32b01
2 changed files with 21 additions and 15 deletions

View File

@@ -2,7 +2,7 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::page_cache::{self, PAGE_SZ};
use crate::page_cache::{self, PAGE_SZ, PageWriteGuard};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use std::cmp::min;
@@ -83,11 +83,12 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
page_cache::ReadBufResult::NotFound(write_guard) => {
let mut write_guard: PageWriteGuard<'static> = write_guard;
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
let mut write_guard = self.file
.read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64).await?;
write_guard.mark_valid();
// Swap for read lock

View File

@@ -11,13 +11,14 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME};
use crate::page_cache::PageWriteGuard;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::os::fd::AsRawFd;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
///
@@ -215,23 +216,27 @@ impl VirtualFile {
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_async(
&self,
mut buf: &mut [u8],
mut offset: u64,
) -> Result<(), Error> {
mut write_guard: PageWriteGuard<'static>,
offset: u64,
) -> Result<PageWriteGuard<'static>, Error> {
let file = self.handle.lock().unwrap().take().unwrap();
let mut put_back = false;
let put_back = AtomicBool::new(false);
let put_back_ref = &put_back;
scopeguard::defer! {
if !put_back {
if !put_back_ref.load(std::sync::atomic::Ordering::Relaxed) {
panic!("mut put self.handle back")
}
};
let res = tokio::task::spawn_blocking(|| file.read_exact_at(buf, offset))
.await
.expect("spawn_blocking");
let ((file, write_guard), res) = tokio::task::spawn_blocking(move || {
let res = file.read_exact_at(write_guard.as_mut(), offset);
((file, write_guard), res)
})
.await
.expect("spawn_blocking");
let replaced = self.handle.lock().unwrap().replace(file);
assert!(replaced.is_none());
put_back = true;
res
put_back.store(true, std::sync::atomic::Ordering::Relaxed);
res.map(|()| write_guard)
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235