diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 0d33100ead..1c4322c0e3 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -17,6 +17,7 @@ use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; use crate::context::RequestContext; use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; +use crate::virtual_file::io_engine::IoEngine; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; @@ -130,8 +131,9 @@ impl BlobWriter { async fn write_all_unbuffered, Buf: IoBuf + Send>( &mut self, src_buf: B, + engine: IoEngine, ) -> (B::Buf, Result<(), Error>) { - let (src_buf, res) = self.inner.write_all(src_buf).await; + let (src_buf, res) = self.inner.write_all(src_buf, engine).await; let nbytes = match res { Ok(nbytes) => nbytes, Err(e) => return (src_buf, Err(e)), @@ -142,9 +144,9 @@ impl BlobWriter { #[inline(always)] /// Flushes the internal buffer to the underlying `VirtualFile`. - pub async fn flush_buffer(&mut self) -> Result<(), Error> { + pub async fn flush_buffer(&mut self, engine: IoEngine) -> Result<(), Error> { let buf = std::mem::take(&mut self.buf); - let (mut buf, res) = self.inner.write_all(buf).await; + let (mut buf, res) = self.inner.write_all(buf, engine).await; res?; buf.clear(); self.buf = buf; @@ -165,10 +167,11 @@ impl BlobWriter { async fn write_all, Buf: IoBuf + Send>( &mut self, src_buf: B, + engine: IoEngine, ) -> (B::Buf, Result<(), Error>) { if !BUFFERED { assert!(self.buf.is_empty()); - return self.write_all_unbuffered(src_buf).await; + return self.write_all_unbuffered(src_buf, engine).await; } let remaining = Self::CAPACITY - self.buf.len(); let src_buf_len = src_buf.bytes_init(); @@ -183,7 +186,7 @@ impl BlobWriter { } // Then, if the buffer is full, flush it out if self.buf.len() == Self::CAPACITY { - if let Err(e) = self.flush_buffer().await { + if let Err(e) = self.flush_buffer(engine).await { return (Slice::into_inner(src_buf), Err(e)); } } @@ -199,7 +202,7 @@ impl BlobWriter { assert_eq!(copied, src_buf.len()); Slice::into_inner(src_buf) } else { - let (src_buf, res) = self.write_all_unbuffered(src_buf).await; + let (src_buf, res) = self.write_all_unbuffered(src_buf, engine).await; if let Err(e) = res { return (src_buf, Err(e)); } @@ -216,6 +219,7 @@ impl BlobWriter { pub async fn write_blob, Buf: IoBuf + Send>( &mut self, srcbuf: B, + engine: IoEngine, ) -> (B::Buf, Result) { let offset = self.offset; @@ -227,7 +231,7 @@ impl BlobWriter { if len < 128 { // Short blob. Write a 1-byte length header io_buf.put_u8(len as u8); - self.write_all(io_buf).await + self.write_all(io_buf, engine).await } else { // Write a 4-byte length header if len > 0x7fff_ffff { @@ -242,7 +246,7 @@ impl BlobWriter { let mut len_buf = (len as u32).to_be_bytes(); len_buf[0] |= 0x80; io_buf.extend_from_slice(&len_buf[..]); - self.write_all(io_buf).await + self.write_all(io_buf, engine).await } } .await; @@ -251,7 +255,7 @@ impl BlobWriter { Ok(_) => (), Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)), } - let (srcbuf, res) = self.write_all(srcbuf).await; + let (srcbuf, res) = self.write_all(srcbuf, engine).await; (srcbuf, res.map(|_| offset)) } } @@ -261,8 +265,8 @@ impl BlobWriter { /// /// This function flushes the internal buffer before giving access /// to the underlying `VirtualFile`. - pub async fn into_inner(mut self) -> Result { - self.flush_buffer().await?; + pub async fn into_inner(mut self, engine: IoEngine) -> Result { + self.flush_buffer(engine).await?; Ok(self.inner) } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index e48b9e83bd..86642e69ab 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -5,7 +5,7 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; -use crate::virtual_file::{self, VirtualFile}; +use crate::virtual_file::{self, io_engine, VirtualFile}; use bytes::BytesMut; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; @@ -160,7 +160,7 @@ impl EphemeralFile { let (mutable_tail, res) = self .ephemeral_file .file - .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64) + .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64, io_engine::get() /* TODO: experiment with StdFs here, did it in an earlier commit in this branch */) .await; // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail. // I.e., the IO isn't retryable if we panic. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b7132ee3bf..927a1d085d 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -40,6 +40,7 @@ use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; use crate::tenant::{PageReconstructError, Timeline}; +use crate::virtual_file::io_engine::{self, IoEngine}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; @@ -441,7 +442,7 @@ impl DeltaLayerWriterInner { will_init: bool, ) -> (Vec, anyhow::Result<()>) { assert!(self.lsn_range.start <= lsn); - let (val, res) = self.blob_writer.write_blob(val).await; + let (val, res) = self.blob_writer.write_blob(val, IoEngine::StdFs).await; let off = match res { Ok(off) => off, Err(e) => return (val, Err(anyhow::anyhow!(e))), @@ -465,14 +466,14 @@ impl DeltaLayerWriterInner { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; - let mut file = self.blob_writer.into_inner().await?; + let mut file = self.blob_writer.into_inner(IoEngine::StdFs).await?; // Write out the index let (index_root_blk, block_buf) = self.tree.finish()?; file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) .await?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf).await; + let (_buf, res) = file.write_all(buf, IoEngine::StdFs).await; res?; } assert!(self.lsn_range.start < self.lsn_range.end); @@ -492,7 +493,7 @@ impl DeltaLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf).await; + let (_buf, res) = file.write_all(buf, IoEngine::StdFs).await; res?; let metadata = file @@ -690,7 +691,7 @@ impl DeltaLayer { // TODO: could use smallvec here, but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf).await; + let (_buf, res) = file.write_all(buf, io_engine::get()).await; res?; Ok(()) } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 14c79e413c..69fd23b880 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -38,7 +38,7 @@ use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; use crate::tenant::{PageReconstructError, Timeline}; -use crate::virtual_file::{self, VirtualFile}; +use crate::virtual_file::{self, io_engine, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; @@ -356,7 +356,7 @@ impl ImageLayer { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf).await; + let (_buf, res) = file.write_all(buf, io_engine::get()).await; res?; Ok(()) } @@ -658,7 +658,7 @@ impl ImageLayerWriterInner { /// async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> { ensure!(self.key_range.contains(&key)); - let (_img, res) = self.blob_writer.write_blob(img).await; + let (_img, res) = self.blob_writer.write_blob(img, io_engine::get()).await; // TODO: re-use the buffer for `img` further upstack let off = res?; @@ -683,7 +683,7 @@ impl ImageLayerWriterInner { .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - let (_buf, res) = file.write_all(buf).await; + let (_buf, res) = file.write_all(buf, io_engine::get()).await; res?; } @@ -703,7 +703,7 @@ impl ImageLayerWriterInner { // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; file.seek(SeekFrom::Start(0)).await?; - let (_buf, res) = file.write_all(buf).await; + let (_buf, res) = file.write_all(buf, io_engine::get()).await; res?; let metadata = file diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 6d4774cf75..89ad432d5c 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -34,6 +34,8 @@ pub(crate) use io_engine::IoEngineKind; pub(crate) use metadata::Metadata; pub(crate) use open_options::*; +use self::io_engine::IoEngine; + /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally /// the underlying file is closed if the system is low on file descriptors, @@ -596,6 +598,7 @@ impl VirtualFile { &self, buf: B, mut offset: u64, + engine: IoEngine, ) -> (B::Buf, Result<(), Error>) { let buf_len = buf.bytes_init(); if buf_len == 0 { @@ -604,7 +607,7 @@ impl VirtualFile { let mut buf = buf.slice(0..buf_len); while !buf.is_empty() { let res; - (buf, res) = self.write_at(buf, offset).await; + (buf, res) = self.write_at(buf, offset, engine).await; match res { Ok(0) => { return ( @@ -633,6 +636,7 @@ impl VirtualFile { pub async fn write_all, Buf: IoBuf + Send>( &mut self, buf: B, + engine: IoEngine, ) -> (B::Buf, Result) { let nbytes = buf.bytes_init(); if nbytes == 0 { @@ -641,7 +645,7 @@ impl VirtualFile { let mut buf = buf.slice(0..nbytes); while !buf.is_empty() { let res; - (buf, res) = self.write(buf).await; + (buf, res) = self.write(buf, engine).await; match res { Ok(0) => { return ( @@ -665,9 +669,10 @@ impl VirtualFile { async fn write( &mut self, buf: Slice, + engine: IoEngine, ) -> (Slice, Result) { let pos = self.pos; - let (buf, res) = self.write_at(buf, pos).await; + let (buf, res) = self.write_at(buf, pos, engine).await; let n = match res { Ok(n) => n, Err(e) => return (buf, Err(e)), @@ -705,14 +710,14 @@ impl VirtualFile { &self, buf: Slice, offset: u64, + engine: IoEngine, ) -> (Slice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, Err(e) => return (buf, Err(e)), }; observe_duration!(StorageIoOperation::Write, { - let ((_file_guard, buf), result) = - io_engine::get().write_at(file_guard, offset, buf).await; + let ((_file_guard, buf), result) = engine.write_at(file_guard, offset, buf).await; if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&[ @@ -1150,7 +1155,7 @@ mod tests { ) -> Result<(), Error> { match self { MaybeVirtualFile::VirtualFile(file) => { - let (_buf, res) = file.write_all_at(buf, offset).await; + let (_buf, res) = file.write_all_at(buf, offset, io_engine::get()).await; res } MaybeVirtualFile::File(file) => { diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index e369d28711..94451a79a2 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -13,7 +13,7 @@ use tracing::Instrument; pub(crate) use super::api::IoEngineKind; #[derive(Clone, Copy)] #[repr(u8)] -pub(crate) enum IoEngine { +pub enum IoEngine { NotSet, StdFs, #[cfg(target_os = "linux")] diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index cf64c86821..9452ee611c 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -200,7 +200,7 @@ def wait_for_last_record_lsn( lsn: Lsn, ) -> Lsn: """waits for pageserver to catch up to a certain lsn, returns the last observed lsn.""" - for i in range(100): + for i in range(10000): current_lsn = last_record_lsn(pageserver_http, tenant, timeline) if current_lsn >= lsn: return current_lsn