use O_DIRECT for VirtualFile reads

This commit is contained in:
Christian Schwarz
2024-01-29 16:20:24 +00:00
parent 74633a33ba
commit a4ce99bc07
5 changed files with 73 additions and 30 deletions

View File

@@ -2,18 +2,27 @@ use std::cell::RefCell;
use crate::tenant::disk_btree::PAGE_SZ;
pub struct Buffer(Option<Box<[u8; PAGE_SZ]>>);
#[repr(C, align(8192))]
struct BufferContent([u8; PAGE_SZ]);
impl BufferContent {
fn empty() -> Self {
BufferContent(std::array::from_fn(|_| 0))
}
}
pub struct Buffer(Option<Box<BufferContent>>);
// Thread-local list of re-usable buffers.
thread_local! {
static POOL: RefCell<Vec<Box<[u8; PAGE_SZ]>>> = RefCell::new(Vec::new());
static POOL: RefCell<Vec<Box<BufferContent>>> = RefCell::new(Vec::new());
}
pub(crate) fn get() -> Buffer {
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
match maybe {
Some(buf) => Buffer(Some(buf)),
None => Buffer(Some(Box::new([0; PAGE_SZ]))),
None => Buffer(Some(Box::new(BufferContent::empty()))),
}
}
@@ -28,13 +37,13 @@ impl std::ops::Deref for Buffer {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap().as_ref()
&self.0.as_ref().unwrap().as_ref().0
}
}
impl std::ops::DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().unwrap().as_mut()
&mut self.0.as_mut().unwrap().as_mut().0
}
}

View File

@@ -165,7 +165,17 @@ struct SlotInner {
key: Option<CacheKey>,
// for `coalesce_readers_permit`
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
buf: &'static mut [u8; PAGE_SZ],
buf: &'static mut SlotContents,
}
#[derive(Clone)]
#[repr(C, align(8192))]
struct SlotContents([u8; PAGE_SZ]);
impl SlotContents {
fn empty() -> Self {
Self(std::array::from_fn(|_| 0))
}
}
impl Slot {
@@ -253,13 +263,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.slot_guard.buf
&self.slot_guard.buf.0
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
&self.slot_guard.buf.0
}
}
@@ -285,7 +295,7 @@ enum PageWriteGuardState<'i> {
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf.0,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -296,7 +306,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
fn deref(&self) -> &Self::Target {
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf.0,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -672,29 +682,22 @@ impl PageCache {
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let slot_contents = Box::leak(vec![SlotContents::empty(); num_pages].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf,
permit: std::sync::Mutex::new(Weak::new()),
}),
usage_count: AtomicU8::new(0),
}
let slots = slot_contents
.into_iter()
.map(|slot_contents| Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf: slot_contents,
permit: std::sync::Mutex::new(Weak::new()),
}),
usage_count: AtomicU8::new(0),
})
.collect();

View File

@@ -688,7 +688,14 @@ impl DeltaLayerInner {
summary: Option<Summary>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new()
.read(true)
.custom_flags(nix::libc::O_DIRECT),
)
.await
{
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};

View File

@@ -367,7 +367,14 @@ impl ImageLayerInner {
summary: Option<Summary>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new()
.read(true)
.custom_flags(nix::libc::O_DIRECT),
)
.await
{
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};

View File

@@ -1,7 +1,12 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use nix::libc;
use super::IoEngineKind;
use std::{os::fd::OwnedFd, path::Path};
use std::{
os::{fd::OwnedFd, unix::fs::OpenOptionsExt},
path::Path,
};
#[derive(Debug, Clone)]
pub enum OpenOptions {
@@ -92,6 +97,18 @@ impl OpenOptions {
self
}
pub fn custom_flags(&mut self, custom_flags: libc::c_int) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(custom_flags);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.custom_flags(custom_flags);
}
}
self
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),