Revert "experiment: for create_delta_layer _write path_, use StdFs io engine in a spawn_blocking thread single-threaded runtime"

This reverts commit 4a8e7f8716.
This commit is contained in:
Christian Schwarz
2024-03-13 15:09:06 +00:00
parent 1339834297
commit c9d1f51a93
7 changed files with 31 additions and 41 deletions

View File

@@ -17,7 +17,6 @@ use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::io_engine::IoEngine;
use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::io::{Error, ErrorKind};
@@ -131,9 +130,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
engine: IoEngine,
) -> (B::Buf, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf, engine).await;
let (src_buf, res) = self.inner.write_all(src_buf).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
@@ -144,9 +142,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self, engine: IoEngine) -> Result<(), Error> {
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf, engine).await;
let (mut buf, res) = self.inner.write_all(buf).await;
res?;
buf.clear();
self.buf = buf;
@@ -167,11 +165,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
engine: IoEngine,
) -> (B::Buf, Result<(), Error>) {
if !BUFFERED {
assert!(self.buf.is_empty());
return self.write_all_unbuffered(src_buf, engine).await;
return self.write_all_unbuffered(src_buf).await;
}
let remaining = Self::CAPACITY - self.buf.len();
let src_buf_len = src_buf.bytes_init();
@@ -186,7 +183,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}
// Then, if the buffer is full, flush it out
if self.buf.len() == Self::CAPACITY {
if let Err(e) = self.flush_buffer(engine).await {
if let Err(e) = self.flush_buffer().await {
return (Slice::into_inner(src_buf), Err(e));
}
}
@@ -202,7 +199,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
assert_eq!(copied, src_buf.len());
Slice::into_inner(src_buf)
} else {
let (src_buf, res) = self.write_all_unbuffered(src_buf, engine).await;
let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
if let Err(e) = res {
return (src_buf, Err(e));
}
@@ -219,7 +216,6 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
engine: IoEngine,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
@@ -231,7 +227,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
if len < 128 {
// Short blob. Write a 1-byte length header
io_buf.put_u8(len as u8);
self.write_all(io_buf, engine).await
self.write_all(io_buf).await
} else {
// Write a 4-byte length header
if len > 0x7fff_ffff {
@@ -246,7 +242,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
let mut len_buf = (len as u32).to_be_bytes();
len_buf[0] |= 0x80;
io_buf.extend_from_slice(&len_buf[..]);
self.write_all(io_buf, engine).await
self.write_all(io_buf).await
}
}
.await;
@@ -255,7 +251,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
Ok(_) => (),
Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
}
let (srcbuf, res) = self.write_all(srcbuf, engine).await;
let (srcbuf, res) = self.write_all(srcbuf).await;
(srcbuf, res.map(|_| offset))
}
}
@@ -265,8 +261,8 @@ impl BlobWriter<true> {
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, engine: IoEngine) -> Result<VirtualFile, Error> {
self.flush_buffer(engine).await?;
pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
self.flush_buffer().await?;
Ok(self.inner)
}

View File

@@ -5,7 +5,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, io_engine, VirtualFile};
use crate::virtual_file::{self, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
@@ -160,7 +160,7 @@ impl EphemeralFile {
let (mutable_tail, res) = self
.ephemeral_file
.file
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64, io_engine::get() /* TODO: experiment with StdFs here, did it in an earlier commit in this branch */)
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
.await;
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
// I.e., the IO isn't retryable if we panic.

View File

@@ -40,7 +40,6 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::io_engine::{self, IoEngine};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
@@ -442,7 +441,7 @@ impl DeltaLayerWriterInner {
will_init: bool,
) -> (Vec<u8>, anyhow::Result<()>) {
assert!(self.lsn_range.start <= lsn);
let (val, res) = self.blob_writer.write_blob(val, IoEngine::StdFs).await;
let (val, res) = self.blob_writer.write_blob(val).await;
let off = match res {
Ok(off) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),
@@ -466,14 +465,14 @@ impl DeltaLayerWriterInner {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner(IoEngine::StdFs).await?;
let mut file = self.blob_writer.into_inner().await?;
// Write out the index
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf, IoEngine::StdFs).await;
let (_buf, res) = file.write_all(buf).await;
res?;
}
assert!(self.lsn_range.start < self.lsn_range.end);
@@ -493,7 +492,7 @@ impl DeltaLayerWriterInner {
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, IoEngine::StdFs).await;
let (_buf, res) = file.write_all(buf).await;
res?;
let metadata = file
@@ -691,7 +690,7 @@ impl DeltaLayer {
// TODO: could use smallvec here, but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, io_engine::get()).await;
let (_buf, res) = file.write_all(buf).await;
res?;
Ok(())
}

View File

@@ -38,7 +38,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::virtual_file::{self, io_engine, VirtualFile};
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
@@ -356,7 +356,7 @@ impl ImageLayer {
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, io_engine::get()).await;
let (_buf, res) = file.write_all(buf).await;
res?;
Ok(())
}
@@ -658,7 +658,7 @@ impl ImageLayerWriterInner {
///
async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let (_img, res) = self.blob_writer.write_blob(img, io_engine::get()).await;
let (_img, res) = self.blob_writer.write_blob(img).await;
// TODO: re-use the buffer for `img` further upstack
let off = res?;
@@ -683,7 +683,7 @@ impl ImageLayerWriterInner {
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf, io_engine::get()).await;
let (_buf, res) = file.write_all(buf).await;
res?;
}
@@ -703,7 +703,7 @@ impl ImageLayerWriterInner {
// TODO: could use smallvec here but it's a pain with Slice<T>
Summary::ser_into(&summary, &mut buf)?;
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf, io_engine::get()).await;
let (_buf, res) = file.write_all(buf).await;
res?;
let metadata = file

View File

@@ -34,8 +34,6 @@ pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
use self::io_engine::IoEngine;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -598,7 +596,6 @@ impl VirtualFile {
&self,
buf: B,
mut offset: u64,
engine: IoEngine,
) -> (B::Buf, Result<(), Error>) {
let buf_len = buf.bytes_init();
if buf_len == 0 {
@@ -607,7 +604,7 @@ impl VirtualFile {
let mut buf = buf.slice(0..buf_len);
while !buf.is_empty() {
let res;
(buf, res) = self.write_at(buf, offset, engine).await;
(buf, res) = self.write_at(buf, offset).await;
match res {
Ok(0) => {
return (
@@ -636,7 +633,6 @@ impl VirtualFile {
pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
engine: IoEngine,
) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
@@ -645,7 +641,7 @@ impl VirtualFile {
let mut buf = buf.slice(0..nbytes);
while !buf.is_empty() {
let res;
(buf, res) = self.write(buf, engine).await;
(buf, res) = self.write(buf).await;
match res {
Ok(0) => {
return (
@@ -669,10 +665,9 @@ impl VirtualFile {
async fn write<B: IoBuf + Send>(
&mut self,
buf: Slice<B>,
engine: IoEngine,
) -> (Slice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let (buf, res) = self.write_at(buf, pos, engine).await;
let (buf, res) = self.write_at(buf, pos).await;
let n = match res {
Ok(n) => n,
Err(e) => return (buf, Err(e)),
@@ -710,14 +705,14 @@ impl VirtualFile {
&self,
buf: Slice<B>,
offset: u64,
engine: IoEngine,
) -> (Slice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Write, {
let ((_file_guard, buf), result) = engine.write_at(file_guard, offset, buf).await;
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&[
@@ -1155,7 +1150,7 @@ mod tests {
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset, io_engine::get()).await;
let (_buf, res) = file.write_all_at(buf, offset).await;
res
}
MaybeVirtualFile::File(file) => {

View File

@@ -13,7 +13,7 @@ use tracing::Instrument;
pub(crate) use super::api::IoEngineKind;
#[derive(Clone, Copy)]
#[repr(u8)]
pub enum IoEngine {
pub(crate) enum IoEngine {
NotSet,
StdFs,
#[cfg(target_os = "linux")]

View File

@@ -200,7 +200,7 @@ def wait_for_last_record_lsn(
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(10000):
for i in range(100):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn