mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
pageserver: use direct IO for delta and image layer reads (#9326)
Part of #8130 ## Problem Pageserver previously goes through the kernel page cache for all the IOs. The kernel page cache makes light-loaded pageserver have deceptive fast performance. Using direct IO would offer predictable latencies of our virtual file IO operations. In particular for reads, the data pages also have an extremely low temporal locality because the most frequently accessed pages are cached on the compute side. ## Summary of changes This PR enables pageserver to use direct IO for delta layer and image layer reads. We can ship them separately because these layers are write-once, read-many, so we will not be mixing buffered IO with direct IO. - implement `IoBufferMut`, an buffer type with aligned allocation (currently set to 512). - use `IoBufferMut` at all places we are doing reads on image + delta layers. - leverage Rust type system and use `IoBufAlignedMut` marker trait to guarantee that the input buffers for the IO operations are aligned. - page cache allocation is also made aligned. _* in-memory layer reads and the write path will be shipped separately._ ## Testing Integration test suite run with O_DIRECT enabled: https://github.com/neondatabase/neon/pull/9350 ## Performance We evaluated performance based on the `get-page-at-latest-lsn` benchmark. The results demonstrate a decrease in the number of IOps, no sigificant change in the latency mean, and an slight improvement on the p99.9 and p99.99 latencies. [Benchmark](https://www.notion.so/neondatabase/Benchmark-O_DIRECT-for-image-and-delta-layers-2024-10-01-112f189e00478092a195ea5a0137e706?pvs=4) ## Rollout We will add `virtual_file_io_mode=direct` region by region to enable direct IO on image + delta layers. Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -18,6 +18,9 @@ use crate::page_cache::{PageWriteGuard, PAGE_SZ};
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
|
||||
use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
|
||||
use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -55,6 +58,8 @@ pub(crate) mod owned_buffers_io {
|
||||
//! but for the time being we're proving out the primitives in the neon.git repo
|
||||
//! for faster iteration.
|
||||
|
||||
pub(crate) mod aligned_buffer;
|
||||
pub(crate) mod io_buf_aligned;
|
||||
pub(crate) mod io_buf_ext;
|
||||
pub(crate) mod slice;
|
||||
pub(crate) mod write;
|
||||
@@ -196,7 +201,7 @@ impl VirtualFile {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Slice<Buf>, Error>
|
||||
where
|
||||
Buf: IoBufMut + Send,
|
||||
Buf: IoBufAlignedMut + Send,
|
||||
{
|
||||
self.inner.read_exact_at(slice, offset, ctx).await
|
||||
}
|
||||
@@ -771,7 +776,7 @@ impl VirtualFileInner {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Slice<Buf>, Error>
|
||||
where
|
||||
Buf: IoBufMut + Send,
|
||||
Buf: IoBufAlignedMut + Send,
|
||||
{
|
||||
let assert_we_return_original_bounds = if cfg!(debug_assertions) {
|
||||
Some((slice.stable_ptr() as usize, slice.bytes_total()))
|
||||
@@ -1222,12 +1227,14 @@ impl VirtualFileInner {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let slice = Vec::with_capacity(PAGE_SZ).slice_full();
|
||||
let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
|
||||
assert_eq!(slice.bytes_total(), PAGE_SZ);
|
||||
let slice = self
|
||||
.read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
|
||||
.await?;
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
|
||||
Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
|
||||
slice.into_inner(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
@@ -1325,10 +1332,11 @@ impl OpenFiles {
|
||||
/// server startup.
|
||||
///
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind) {
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
set_io_mode(mode);
|
||||
io_engine::init(engine);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
@@ -1357,6 +1365,11 @@ pub(crate) const fn get_io_buffer_alignment() -> usize {
|
||||
DEFAULT_IO_BUFFER_ALIGNMENT
|
||||
}
|
||||
|
||||
pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
pub(crate) type IoPageSlice<'a> =
|
||||
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
|
||||
static IO_MODE: AtomicU8 = AtomicU8::new(IoMode::preferred() as u8);
|
||||
|
||||
pub(crate) fn set_io_mode(mode: IoMode) {
|
||||
@@ -1395,10 +1408,10 @@ mod tests {
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(
|
||||
&self,
|
||||
mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
|
||||
mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
|
||||
) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
|
||||
MaybeVirtualFile::File(file) => {
|
||||
@@ -1466,12 +1479,13 @@ mod tests {
|
||||
len: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, Error> {
|
||||
let slice = Vec::with_capacity(len).slice_full();
|
||||
let slice = IoBufferMut::with_capacity(len).slice_full();
|
||||
assert_eq!(slice.bytes_total(), len);
|
||||
let slice = self.read_exact_at(slice, pos, ctx).await?;
|
||||
let vec = slice.into_inner();
|
||||
assert_eq!(vec.len(), len);
|
||||
Ok(String::from_utf8(vec).unwrap())
|
||||
let buf = slice.into_inner();
|
||||
assert_eq!(buf.len(), len);
|
||||
|
||||
Ok(String::from_utf8(buf.to_vec()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1695,7 +1709,7 @@ mod tests {
|
||||
let files = files.clone();
|
||||
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let hdl = rt.spawn(async move {
|
||||
let mut buf = vec![0u8; SIZE];
|
||||
let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
@@ -1704,7 +1718,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
assert!(buf == SAMPLE);
|
||||
assert!(buf[..] == SAMPLE);
|
||||
}
|
||||
});
|
||||
hdls.push(hdl);
|
||||
|
||||
Reference in New Issue
Block a user