mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
use O_DIRECT for VirtualFile reads
This commit is contained in:
@@ -2,18 +2,27 @@ use std::cell::RefCell;
|
||||
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
|
||||
pub struct Buffer(Option<Box<[u8; PAGE_SZ]>>);
|
||||
#[repr(C, align(8192))]
|
||||
struct BufferContent([u8; PAGE_SZ]);
|
||||
|
||||
impl BufferContent {
|
||||
fn empty() -> Self {
|
||||
BufferContent(std::array::from_fn(|_| 0))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Buffer(Option<Box<BufferContent>>);
|
||||
|
||||
// Thread-local list of re-usable buffers.
|
||||
thread_local! {
|
||||
static POOL: RefCell<Vec<Box<[u8; PAGE_SZ]>>> = RefCell::new(Vec::new());
|
||||
static POOL: RefCell<Vec<Box<BufferContent>>> = RefCell::new(Vec::new());
|
||||
}
|
||||
|
||||
pub(crate) fn get() -> Buffer {
|
||||
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
|
||||
match maybe {
|
||||
Some(buf) => Buffer(Some(buf)),
|
||||
None => Buffer(Some(Box::new([0; PAGE_SZ]))),
|
||||
None => Buffer(Some(Box::new(BufferContent::empty()))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,13 +37,13 @@ impl std::ops::Deref for Buffer {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap().as_ref()
|
||||
&self.0.as_ref().unwrap().as_ref().0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for Buffer {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_mut().unwrap().as_mut()
|
||||
&mut self.0.as_mut().unwrap().as_mut().0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -165,7 +165,17 @@ struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
// for `coalesce_readers_permit`
|
||||
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
buf: &'static mut SlotContents,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[repr(C, align(8192))]
|
||||
struct SlotContents([u8; PAGE_SZ]);
|
||||
|
||||
impl SlotContents {
|
||||
fn empty() -> Self {
|
||||
Self(std::array::from_fn(|_| 0))
|
||||
}
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
@@ -253,13 +263,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.slot_guard.buf
|
||||
&self.slot_guard.buf.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.slot_guard.buf
|
||||
&self.slot_guard.buf.0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -285,7 +295,7 @@ enum PageWriteGuardState<'i> {
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf.0,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -296,7 +306,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf.0,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -672,29 +682,22 @@ impl PageCache {
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
// We could use Vec::leak here, but that potentially also leaks
|
||||
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
|
||||
// this is avoided.
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
let slot_contents = Box::leak(vec![SlotContents::empty(); num_pages].into_boxed_slice());
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
let slots = slot_contents
|
||||
.into_iter()
|
||||
.map(|slot_contents| Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf: slot_contents,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
||||
@@ -688,7 +688,14 @@ impl DeltaLayerInner {
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
let file = match VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.custom_flags(nix::libc::O_DIRECT),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
|
||||
@@ -367,7 +367,14 @@ impl ImageLayerInner {
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
let file = match VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.custom_flags(nix::libc::O_DIRECT),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use nix::libc;
|
||||
|
||||
use super::IoEngineKind;
|
||||
use std::{os::fd::OwnedFd, path::Path};
|
||||
use std::{
|
||||
os::{fd::OwnedFd, unix::fs::OpenOptionsExt},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OpenOptions {
|
||||
@@ -92,6 +97,18 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn custom_flags(&mut self, custom_flags: libc::c_int) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.custom_flags(custom_flags);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(custom_flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
|
||||
Reference in New Issue
Block a user