mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Revert "undo all changes except gate,cancel,context propagation"
This reverts commit f25f71bc98.
This commit is contained in:
@@ -1289,6 +1289,7 @@ pub(crate) enum StorageIoOperation {
|
||||
Seek,
|
||||
Fsync,
|
||||
Metadata,
|
||||
SetLen,
|
||||
}
|
||||
|
||||
impl StorageIoOperation {
|
||||
@@ -1303,6 +1304,7 @@ impl StorageIoOperation {
|
||||
StorageIoOperation::Seek => "seek",
|
||||
StorageIoOperation::Fsync => "fsync",
|
||||
StorageIoOperation::Metadata => "metadata",
|
||||
StorageIoOperation::SetLen => "set_len",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,21 +15,23 @@
|
||||
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use std::cmp::min;
|
||||
use std::io::Error;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_compression::Level;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CompressionInfo {
|
||||
@@ -37,6 +39,14 @@ pub struct CompressionInfo {
|
||||
pub compressed_size: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(FlushTaskError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
}
|
||||
|
||||
impl BlockCursor<'_> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
@@ -158,141 +168,62 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
|
||||
/// A wrapper of `VirtualFile` that allows users to write blobs.
|
||||
///
|
||||
/// If a `BlobWriter` is dropped, the internal buffer will be
|
||||
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
|
||||
/// discarded. You need to call [`Self::into_inner`]
|
||||
/// manually before dropping.
|
||||
pub struct BlobWriter<const BUFFERED: bool> {
|
||||
inner: VirtualFile,
|
||||
offset: u64,
|
||||
/// A buffer to save on write calls, only used if BUFFERED=true
|
||||
buf: Vec<u8>,
|
||||
pub struct BlobWriter {
|
||||
/// We do tiny writes for the length headers; they need to be in an owned buffer;
|
||||
io_buf: Option<BytesMut>,
|
||||
writer: BufferedWriter<IoBufferMut, VirtualFile>,
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
impl BlobWriter {
|
||||
pub fn new(
|
||||
inner: VirtualFile,
|
||||
file: Arc<VirtualFile>,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
_ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
offset: start_offset,
|
||||
buf: Vec::with_capacity(Self::CAPACITY),
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
flush_task_span: tracing::Span,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
io_buf: Some(BytesMut::new()),
|
||||
}
|
||||
writer: BufferedWriter::new(
|
||||
file,
|
||||
start_offset,
|
||||
|| IoBufferMut::with_capacity(Self::CAPACITY),
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
ctx,
|
||||
flush_task_span,
|
||||
),
|
||||
offset: start_offset,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.offset
|
||||
}
|
||||
|
||||
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
|
||||
const CAPACITY: usize = 64 * 1024;
|
||||
|
||||
/// Writes the given buffer directly to the underlying `VirtualFile`.
|
||||
/// You need to make sure that the internal buffer is empty, otherwise
|
||||
/// data will be written in wrong order.
|
||||
#[inline(always)]
|
||||
async fn write_all_unbuffered<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), Error>) {
|
||||
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
|
||||
let nbytes = match res {
|
||||
Ok(nbytes) => nbytes,
|
||||
Err(e) => return (src_buf, Err(e)),
|
||||
};
|
||||
self.offset += nbytes as u64;
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// Flushes the internal buffer to the underlying `VirtualFile`.
|
||||
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
|
||||
let buf = std::mem::take(&mut self.buf);
|
||||
let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
|
||||
res?;
|
||||
let mut buf = slice.into_raw_slice().into_inner();
|
||||
buf.clear();
|
||||
self.buf = buf;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// Writes as much of `src_buf` into the internal buffer as it fits
|
||||
fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
|
||||
let remaining = Self::CAPACITY - self.buf.len();
|
||||
let to_copy = src_buf.len().min(remaining);
|
||||
self.buf.extend_from_slice(&src_buf[..to_copy]);
|
||||
self.offset += to_copy as u64;
|
||||
to_copy
|
||||
}
|
||||
|
||||
/// Internal, possibly buffered, write function
|
||||
/// Writes `src_buf` to the file at the current offset.
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), Error>) {
|
||||
let src_buf = src_buf.into_raw_slice();
|
||||
let src_buf_bounds = src_buf.bounds();
|
||||
let restore = move |src_buf_slice: Slice<_>| {
|
||||
FullSlice::must_new(Slice::from_buf_bounds(
|
||||
src_buf_slice.into_inner(),
|
||||
src_buf_bounds,
|
||||
))
|
||||
};
|
||||
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
|
||||
let res = self
|
||||
.writer
|
||||
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
|
||||
// Can remove all the complexity around owned buffers upstack
|
||||
.write_buffered_borrowed(&src_buf, ctx)
|
||||
.await
|
||||
.map(|len| {
|
||||
self.offset += len as u64;
|
||||
});
|
||||
|
||||
if !BUFFERED {
|
||||
assert!(self.buf.is_empty());
|
||||
return self
|
||||
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
|
||||
.await;
|
||||
}
|
||||
let remaining = Self::CAPACITY - self.buf.len();
|
||||
let src_buf_len = src_buf.bytes_init();
|
||||
if src_buf_len == 0 {
|
||||
return (restore(src_buf), Ok(()));
|
||||
}
|
||||
let mut src_buf = src_buf.slice(0..src_buf_len);
|
||||
// First try to copy as much as we can into the buffer
|
||||
if remaining > 0 {
|
||||
let copied = self.write_into_buffer(&src_buf);
|
||||
src_buf = src_buf.slice(copied..);
|
||||
}
|
||||
// Then, if the buffer is full, flush it out
|
||||
if self.buf.len() == Self::CAPACITY {
|
||||
if let Err(e) = self.flush_buffer(ctx).await {
|
||||
return (restore(src_buf), Err(e));
|
||||
}
|
||||
}
|
||||
// Finally, write the tail of src_buf:
|
||||
// If it wholly fits into the buffer without
|
||||
// completely filling it, then put it there.
|
||||
// If not, write it out directly.
|
||||
let src_buf = if !src_buf.is_empty() {
|
||||
assert_eq!(self.buf.len(), 0);
|
||||
if src_buf.len() < Self::CAPACITY {
|
||||
let copied = self.write_into_buffer(&src_buf);
|
||||
// We just verified above that src_buf fits into our internal buffer.
|
||||
assert_eq!(copied, src_buf.len());
|
||||
restore(src_buf)
|
||||
} else {
|
||||
let (src_buf, res) = self
|
||||
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
return (src_buf, Err(e));
|
||||
}
|
||||
src_buf
|
||||
}
|
||||
} else {
|
||||
restore(src_buf)
|
||||
};
|
||||
(src_buf, Ok(()))
|
||||
(src_buf, res)
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
@@ -301,7 +232,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
srcbuf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<u64, Error>) {
|
||||
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
|
||||
let (buf, res) = self
|
||||
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
|
||||
.await;
|
||||
@@ -315,7 +246,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
srcbuf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
algorithm: ImageCompressionAlgorithm,
|
||||
) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
|
||||
) -> (
|
||||
FullSlice<Buf>,
|
||||
Result<(u64, CompressionInfo), WriteBlobError>,
|
||||
) {
|
||||
let offset = self.offset;
|
||||
let mut compression_info = CompressionInfo {
|
||||
written_compressed: false,
|
||||
@@ -331,14 +265,16 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
|
||||
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
|
||||
let res = res.map_err(WriteBlobError::Flush);
|
||||
((slice, res), srcbuf)
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > MAX_SUPPORTED_BLOB_LEN {
|
||||
return (
|
||||
(
|
||||
io_buf.slice_len(),
|
||||
Err(Error::other(format!("blob too large ({len} bytes)"))),
|
||||
Err(WriteBlobError::BlobTooLarge { len }),
|
||||
),
|
||||
srcbuf,
|
||||
);
|
||||
@@ -372,7 +308,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
assert_eq!(len_buf[0] & 0xf0, 0);
|
||||
len_buf[0] |= high_bit_mask;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
|
||||
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
|
||||
let res = res.map_err(WriteBlobError::Flush);
|
||||
((slice, res), srcbuf)
|
||||
}
|
||||
}
|
||||
.await;
|
||||
@@ -387,33 +325,23 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
} else {
|
||||
self.write_all(srcbuf, ctx).await
|
||||
};
|
||||
let res = res.map_err(WriteBlobError::Flush);
|
||||
(srcbuf, res.map(|_| (offset, compression_info)))
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobWriter<true> {
|
||||
/// Access the underlying `VirtualFile`.
|
||||
///
|
||||
/// This function flushes the internal buffer before giving access
|
||||
/// to the underlying `VirtualFile`.
|
||||
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
|
||||
self.flush_buffer(ctx).await?;
|
||||
Ok(self.inner)
|
||||
}
|
||||
|
||||
/// Access the underlying `VirtualFile`.
|
||||
///
|
||||
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
|
||||
/// the internal buffer before giving access.
|
||||
pub fn into_inner_no_flush(self) -> VirtualFile {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobWriter<false> {
|
||||
/// Access the underlying `VirtualFile`.
|
||||
pub fn into_inner(self) -> VirtualFile {
|
||||
self.inner
|
||||
/// The caller can use the `handle_tail` function to change the tail of the buffer before flushing it to disk.
|
||||
/// The buffer will not be flushed to disk if handle_tail returns `None`.
|
||||
pub async fn into_inner(
|
||||
self,
|
||||
handle_tail: impl FnMut(IoBufferMut) -> Option<IoBufferMut>,
|
||||
) -> Result<VirtualFile, FlushTaskError> {
|
||||
let (_, file) = self.writer.shutdown(handle_tail).await?;
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,21 +350,22 @@ pub(crate) mod tests {
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tracing::info_span;
|
||||
|
||||
use super::*;
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::block_io::BlockReaderRef;
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
round_trip_test_compressed::<BUFFERED>(blobs, false).await
|
||||
async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
|
||||
round_trip_test_compressed(blobs, false).await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
|
||||
pub(crate) async fn write_maybe_compressed(
|
||||
blobs: &[Vec<u8>],
|
||||
compression: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
|
||||
) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
@@ -445,8 +374,9 @@ pub(crate) mod tests {
|
||||
// Write part (in block to drop the file)
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
let file = Arc::new(VirtualFile::create_v2(pathbuf.as_path(), ctx).await?);
|
||||
let mut wtr =
|
||||
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
@@ -463,26 +393,37 @@ pub(crate) mod tests {
|
||||
let offs = res?;
|
||||
offsets.push(offs);
|
||||
}
|
||||
// Write out one page worth of zeros so that we can
|
||||
// read again with read_blk
|
||||
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
|
||||
let offs = res?;
|
||||
println!("Writing final blob at offs={offs}");
|
||||
wtr.flush_buffer(ctx).await?;
|
||||
wtr.into_inner(|mut buf| {
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
|
||||
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
|
||||
// Need to find a better API that allows writing the format we intend to.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await?; // TODO: this here is the problem with the tests: we're dropping the tail end
|
||||
}
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed<const BUFFERED: bool>(
|
||||
async fn round_trip_test_compressed(
|
||||
blobs: &[Vec<u8>],
|
||||
compression: bool,
|
||||
) -> Result<(), Error> {
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let (_temp_dir, pathbuf, offsets) =
|
||||
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
|
||||
write_maybe_compressed(blobs, compression, &ctx).await?;
|
||||
|
||||
let file = VirtualFile::open(pathbuf, &ctx).await?;
|
||||
println!("Done writing!");
|
||||
let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
|
||||
let rdr = BlockReaderRef::VirtualFile(&file);
|
||||
let rdr = BlockCursor::new_with_compression(rdr, compression);
|
||||
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
|
||||
@@ -501,30 +442,27 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_one() -> Result<(), Error> {
|
||||
async fn test_one() -> anyhow::Result<()> {
|
||||
let blobs = &[vec![12, 21, 22]];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hello_simple() -> Result<(), Error> {
|
||||
async fn test_hello_simple() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
vec![0, 1, 2, 3],
|
||||
b"Hello, World!".to_vec(),
|
||||
Vec::new(),
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false>(blobs, true).await?;
|
||||
round_trip_test_compressed::<true>(blobs, true).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
round_trip_test_compressed(blobs, true).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_really_big_array() -> Result<(), Error> {
|
||||
async fn test_really_big_array() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
@@ -533,25 +471,22 @@ pub(crate) mod tests {
|
||||
vec![0xf3; 24 * PAGE_SZ],
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false>(blobs, true).await?;
|
||||
round_trip_test_compressed::<true>(blobs, true).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
round_trip_test_compressed(blobs, true).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_inc() -> Result<(), Error> {
|
||||
async fn test_arrays_inc() -> anyhow::Result<()> {
|
||||
let blobs = (0..PAGE_SZ / 8)
|
||||
.map(|v| random_array(v * 16))
|
||||
.collect::<Vec<_>>();
|
||||
round_trip_test::<false>(&blobs).await?;
|
||||
round_trip_test::<true>(&blobs).await?;
|
||||
round_trip_test(&blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_random_size() -> Result<(), Error> {
|
||||
async fn test_arrays_random_size() -> anyhow::Result<()> {
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||
let blobs = (0..1024)
|
||||
.map(|_| {
|
||||
@@ -563,20 +498,18 @@ pub(crate) mod tests {
|
||||
random_array(sz.into())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
round_trip_test::<false>(&blobs).await?;
|
||||
round_trip_test::<true>(&blobs).await?;
|
||||
round_trip_test(&blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_page_boundary() -> Result<(), Error> {
|
||||
async fn test_arrays_page_boundary() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
random_array(PAGE_SZ - 4),
|
||||
random_array(PAGE_SZ - 4),
|
||||
random_array(PAGE_SZ - 4),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,14 +4,12 @@
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
use bytes::Bytes;
|
||||
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
|
||||
#[cfg(test)]
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{IoBuffer, VirtualFile};
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -247,17 +245,17 @@ pub trait BlockWriter {
|
||||
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
|
||||
/// written to.
|
||||
///
|
||||
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
|
||||
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
|
||||
}
|
||||
|
||||
///
|
||||
/// A simple in-memory buffer of blocks.
|
||||
///
|
||||
pub struct BlockBuf {
|
||||
pub blocks: Vec<Bytes>,
|
||||
pub blocks: Vec<IoBuffer>,
|
||||
}
|
||||
impl BlockWriter for BlockBuf {
|
||||
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
|
||||
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
let blknum = self.blocks.len();
|
||||
self.blocks.push(buf);
|
||||
|
||||
@@ -25,7 +25,7 @@ use std::{io, result};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use byteorder::{BE, ReadBytesExt};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::BufMut;
|
||||
use either::Either;
|
||||
use futures::{Stream, StreamExt};
|
||||
use hex;
|
||||
@@ -34,6 +34,7 @@ use tracing::error;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::block_io::{BlockReader, BlockWriter};
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
|
||||
|
||||
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
|
||||
pub const VALUE_SZ: usize = 5;
|
||||
@@ -787,12 +788,12 @@ impl<const L: usize> BuildNode<L> {
|
||||
///
|
||||
/// Serialize the node to on-disk format.
|
||||
///
|
||||
fn pack(&self) -> Bytes {
|
||||
fn pack(&self) -> IoBuffer {
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
|
||||
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
|
||||
assert!(self.num_children > 0);
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
|
||||
|
||||
buf.put_u16(self.num_children);
|
||||
buf.put_u8(self.level);
|
||||
@@ -805,7 +806,7 @@ impl<const L: usize> BuildNode<L> {
|
||||
assert!(buf.len() == self.size);
|
||||
|
||||
assert!(buf.len() <= PAGE_SZ);
|
||||
buf.resize(PAGE_SZ, 0);
|
||||
buf.extend_with(0, PAGE_SZ - buf.len());
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
@@ -839,7 +840,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct TestDisk {
|
||||
blocks: Vec<Bytes>,
|
||||
blocks: Vec<IoBuffer>,
|
||||
}
|
||||
impl TestDisk {
|
||||
fn new() -> Self {
|
||||
@@ -857,7 +858,7 @@ pub(crate) mod tests {
|
||||
}
|
||||
}
|
||||
impl BlockWriter for &mut TestDisk {
|
||||
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
|
||||
fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
|
||||
let blknum = self.blocks.len();
|
||||
self.blocks.push(buf);
|
||||
Ok(blknum as u32)
|
||||
|
||||
@@ -75,6 +75,7 @@ impl EphemeralFile {
|
||||
bytes_written: 0,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
cancel.child_token(),
|
||||
|
||||
@@ -32,12 +32,14 @@ use super::{
|
||||
remote_tenant_manifest_prefix, remote_tenant_path,
|
||||
};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use crate::assert_u64_eq_usize::UsizeIsU64;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
|
||||
};
|
||||
use crate::tenant::Generation;
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
|
||||
@@ -227,6 +229,7 @@ async fn download_object(
|
||||
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
destination_file,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
cancel.child_token(),
|
||||
@@ -251,13 +254,41 @@ async fn download_object(
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
}
|
||||
let inner = buffered
|
||||
.flush_and_into_inner(ctx)
|
||||
let mut pad_amount = None;
|
||||
let (bytes_amount, destination_file) = buffered
|
||||
.shutdown(|mut buf| {
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
|
||||
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
|
||||
// Need to find a better API that allows writing the format we intend to.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
pad_amount = Some(count);
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
Ok(inner)
|
||||
|
||||
let pad_amount = pad_amount.expect("shutdown always invokes the closure").into_u64();
|
||||
let set_len_arg = bytes_amount - pad_amount;
|
||||
destination_file
|
||||
.set_len(set_len_arg)
|
||||
.await
|
||||
.maybe_fatal_err("download_object set_len")
|
||||
.with_context(|| {
|
||||
format!("set len for file at {dst_path}: 0x{set_len_arg:x} = 0x{bytes_amount:x} - 0x{pad_amount:x}")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((set_len_arg, destination_file))
|
||||
}
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -1521,12 +1521,11 @@ async fn load_heatmap(
|
||||
path: &Utf8PathBuf,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<HeatMapTenant>, anyhow::Error> {
|
||||
let mut file = match VirtualFile::open(path, ctx).await {
|
||||
let st = match VirtualFile::read_to_string(path, ctx).await {
|
||||
Ok(file) => file,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(e) => Err(e)?,
|
||||
};
|
||||
let st = file.read_to_string(ctx).await?;
|
||||
let htm = serde_json::from_str(&st)?;
|
||||
Ok(Some(htm))
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
//!
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
@@ -52,6 +51,7 @@ use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -74,7 +74,8 @@ use crate::tenant::vectored_blob_io::{
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -112,6 +113,15 @@ impl From<&DeltaLayer> for Summary {
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
|
||||
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
|
||||
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
|
||||
Self::ser_into(self, &mut buf)?;
|
||||
// Pad zeroes to the buffer so the length is a multiple of the alignment.
|
||||
buf.extend_with(0, buf.capacity() - buf.len());
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -392,10 +402,12 @@ struct DeltaLayerWriterInner {
|
||||
|
||||
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
|
||||
|
||||
blob_writer: BlobWriter<true>,
|
||||
blob_writer: BlobWriter,
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl DeltaLayerWriterInner {
|
||||
@@ -422,10 +434,17 @@ impl DeltaLayerWriterInner {
|
||||
let path =
|
||||
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
|
||||
|
||||
let mut file = VirtualFile::create(&path, ctx).await?;
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?);
|
||||
|
||||
// Start at PAGE_SZ, make room for the header block
|
||||
let blob_writer = BlobWriter::new(
|
||||
file,
|
||||
PAGE_SZ as u64,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -440,6 +459,7 @@ impl DeltaLayerWriterInner {
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
num_keys: 0,
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -535,15 +555,33 @@ impl DeltaLayerWriterInner {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
let mut file = self.blob_writer.into_inner(ctx).await?;
|
||||
let file = self
|
||||
.blob_writer
|
||||
.into_inner(|mut buf| {
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
// TODO: this is actually padding to next PAGE_SZ multiple, but only if the buffer capacity is larger than that.
|
||||
// We can't let the fact that we do direct IO, or the buffer capacity, dictate the on-disk format we write here.
|
||||
// Need to find a better API that allows writing the format we intend to.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
|
||||
.await?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
assert!(self.lsn_range.start < self.lsn_range.end);
|
||||
// Fill in the summary on blk 0
|
||||
@@ -558,11 +596,9 @@ impl DeltaLayerWriterInner {
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&summary, &mut buf)?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
@@ -729,12 +765,33 @@ impl DeltaLayerWriter {
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
// We want to remove the virtual file here, so it's fine to not
|
||||
// having completely flushed unwritten data.
|
||||
let vfile = inner.blob_writer.into_inner_no_flush();
|
||||
vfile.remove();
|
||||
}
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let DeltaLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = match blob_writer.into_inner(|_| None).await {
|
||||
Ok(vfile) => vfile,
|
||||
Err(e) => {
|
||||
error!(err=%e, "failed to remove delta layer writer file");
|
||||
drop(_gate_guard);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = std::fs::remove_file(vfile.path())
|
||||
.maybe_fatal_err("failed to remove the virtual file")
|
||||
{
|
||||
error!(err=%e, path=%vfile.path(), "failed to remove delta layer writer file");
|
||||
}
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -761,7 +818,7 @@ impl DeltaLayer {
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
let file = VirtualFile::open_with_options_v2(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
ctx,
|
||||
@@ -778,11 +835,8 @@ impl DeltaLayer {
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here, but it's a pain with Slice<T>
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let buf = new_summary.ser_into_page().context("serialize")?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
//! actual page images are stored in the "values" part.
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
@@ -50,6 +49,7 @@ use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -72,7 +72,8 @@ use crate::tenant::vectored_blob_io::{
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -111,6 +112,15 @@ impl From<&ImageLayer> for Summary {
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
|
||||
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
|
||||
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
|
||||
Self::ser_into(self, &mut buf)?;
|
||||
// Pad zeroes to the buffer so the length is a multiple of the alignment.
|
||||
buf.extend_with(0, buf.capacity() - buf.len());
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -353,7 +363,7 @@ impl ImageLayer {
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
let file = VirtualFile::open_with_options_v2(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
ctx,
|
||||
@@ -370,11 +380,8 @@ impl ImageLayer {
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let buf = new_summary.ser_into_page().context("serialize")?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -742,9 +749,11 @@ struct ImageLayerWriterInner {
|
||||
// Number of keys in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
blob_writer: BlobWriter<false>,
|
||||
blob_writer: BlobWriter,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key,
|
||||
}
|
||||
@@ -776,19 +785,28 @@ impl ImageLayerWriterInner {
|
||||
},
|
||||
);
|
||||
trace!("creating image layer {}", path);
|
||||
let mut file = {
|
||||
VirtualFile::open_with_options(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true),
|
||||
ctx,
|
||||
let file = {
|
||||
Arc::new(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Start at `PAGE_SZ` to make room for the header block.
|
||||
let blob_writer = BlobWriter::new(
|
||||
file,
|
||||
PAGE_SZ as u64,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -809,6 +827,8 @@ impl ImageLayerWriterInner {
|
||||
num_keys: 0,
|
||||
#[cfg(feature = "testing")]
|
||||
last_written_key: Key::MIN,
|
||||
|
||||
_gate_guard: gate.enter()?,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -894,15 +914,30 @@ impl ImageLayerWriterInner {
|
||||
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
|
||||
let mut file = self.blob_writer.into_inner();
|
||||
let file = self
|
||||
.blob_writer
|
||||
.into_inner(|mut buf| {
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
|
||||
// pad zeros to the next io alignment requirement.
|
||||
let count = len.next_multiple_of(PAGE_SZ).min(cap) - len;
|
||||
buf.extend_with(0, count);
|
||||
|
||||
Some(buf)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
|
||||
.await?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer?
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
|
||||
let final_key_range = if let Some(end_key) = end_key {
|
||||
@@ -923,11 +958,9 @@ impl ImageLayerWriterInner {
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&summary, &mut buf)?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
@@ -1070,9 +1103,33 @@ impl ImageLayerWriter {
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
let Some(inner) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let ImageLayerWriterInner {
|
||||
blob_writer,
|
||||
_gate_guard,
|
||||
..
|
||||
} = inner;
|
||||
|
||||
let vfile = match blob_writer.into_inner(|_| None).await {
|
||||
Ok(vfile) => vfile,
|
||||
Err(e) => {
|
||||
error!(err=%e, "failed to remove image layer writer file");
|
||||
drop(_gate_guard);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = std::fs::remove_file(vfile.path())
|
||||
.maybe_fatal_err("failed to remove the virtual file")
|
||||
{
|
||||
error!(err=%e, path=%vfile.path(), "failed to remove image layer writer file");
|
||||
}
|
||||
drop(_gate_guard);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -677,7 +677,6 @@ impl StreamingVectoredReadPlanner {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use anyhow::Error;
|
||||
|
||||
use super::super::blob_io::tests::{random_array, write_maybe_compressed};
|
||||
use super::*;
|
||||
@@ -960,13 +959,16 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
|
||||
async fn round_trip_test_compressed(
|
||||
blobs: &[Vec<u8>],
|
||||
compression: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let (_temp_dir, pathbuf, offsets) =
|
||||
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
|
||||
write_maybe_compressed(blobs, compression, &ctx).await?;
|
||||
|
||||
let file = VirtualFile::open(&pathbuf, &ctx).await?;
|
||||
let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
|
||||
let file_len = std::fs::metadata(&pathbuf)?.len();
|
||||
|
||||
// Multiply by two (compressed data might need more space), and add a few bytes for the header
|
||||
@@ -1003,7 +1005,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_really_big_array() -> Result<(), Error> {
|
||||
async fn test_really_big_array() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
@@ -1018,7 +1020,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_inc() -> Result<(), Error> {
|
||||
async fn test_arrays_inc() -> anyhow::Result<()> {
|
||||
let blobs = (0..PAGE_SZ / 8)
|
||||
.map(|v| random_array(v * 16))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
@@ -185,18 +185,14 @@ impl VirtualFile {
|
||||
self.inner.sync_data().await
|
||||
}
|
||||
|
||||
pub async fn set_len(&self, len: u64) -> Result<(), Error> {
|
||||
self.inner.set_len(len).await
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<Metadata, Error> {
|
||||
self.inner.metadata().await
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
self.inner.remove();
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
self.inner.seek(pos).await
|
||||
}
|
||||
|
||||
pub async fn read_exact_at<Buf>(
|
||||
&self,
|
||||
slice: Slice<Buf>,
|
||||
@@ -227,25 +223,31 @@ impl VirtualFile {
|
||||
self.inner.write_all_at(buf, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<usize, Error>) {
|
||||
self.inner.write_all(buf, ctx).await
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
self.inner.read_to_end(buf, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn read_to_string(
|
||||
&mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, anyhow::Error> {
|
||||
) -> std::io::Result<String> {
|
||||
let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
|
||||
let mut buf = Vec::new();
|
||||
self.read_to_end(&mut buf, ctx).await?;
|
||||
Ok(String::from_utf8(buf)?)
|
||||
let mut tmp = vec![0; 128];
|
||||
let mut pos: u64 = 0;
|
||||
loop {
|
||||
let slice = tmp.slice(..128);
|
||||
let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
|
||||
match res {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
pos += n as u64;
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
tmp = slice.into_inner();
|
||||
}
|
||||
String::from_utf8(buf).map_err(|_| {
|
||||
std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,9 +294,6 @@ pub struct VirtualFileInner {
|
||||
/// belongs to a different VirtualFile.
|
||||
handle: RwLock<SlotHandle>,
|
||||
|
||||
/// Current file position
|
||||
pos: u64,
|
||||
|
||||
/// File path and options to use to open it.
|
||||
///
|
||||
/// Note: this only contains the options needed to re-open it. For example,
|
||||
@@ -608,7 +607,6 @@ impl VirtualFileInner {
|
||||
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path.to_owned(),
|
||||
open_options: reopen_options,
|
||||
};
|
||||
@@ -675,6 +673,13 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn set_len(&self, len: u64) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::SetLen, |file_guard| {
|
||||
let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
|
||||
res.maybe_fatal_err("set_len")
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
/// opens it and evicts some other File if necessary. The passed parameter is
|
||||
/// assumed to be a function available for the physical `File`.
|
||||
@@ -742,38 +747,6 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
|
||||
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
if pos < 0 {
|
||||
return Err(Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"offset would be negative",
|
||||
));
|
||||
}
|
||||
if pos > u64::MAX as i128 {
|
||||
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
|
||||
}
|
||||
self.pos = pos as u64;
|
||||
}
|
||||
}
|
||||
Ok(self.pos)
|
||||
}
|
||||
|
||||
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
|
||||
///
|
||||
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
|
||||
@@ -857,59 +830,7 @@ impl VirtualFileInner {
|
||||
(restore(buf), Ok(()))
|
||||
}
|
||||
|
||||
/// Writes `buf` to the file at the current offset.
|
||||
///
|
||||
/// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
|
||||
pub async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<usize, Error>) {
|
||||
let buf = buf.into_raw_slice();
|
||||
let bounds = buf.bounds();
|
||||
let restore =
|
||||
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
|
||||
let nbytes = buf.len();
|
||||
let mut buf = buf;
|
||||
while !buf.is_empty() {
|
||||
let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
|
||||
buf = tmp.into_raw_slice();
|
||||
match res {
|
||||
Ok(0) => {
|
||||
return (
|
||||
restore(buf),
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = buf.slice(n..);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return (restore(buf), Err(e)),
|
||||
}
|
||||
}
|
||||
(restore(buf), Ok(nbytes))
|
||||
}
|
||||
|
||||
async fn write<B: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<B>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, std::io::Error>) {
|
||||
let pos = self.pos;
|
||||
let (buf, res) = self.write_at(buf, pos, ctx).await;
|
||||
let n = match res {
|
||||
Ok(n) => n,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
self.pos += n as u64;
|
||||
(buf, Ok(n))
|
||||
}
|
||||
|
||||
pub(crate) async fn read_at<Buf>(
|
||||
pub(super) async fn read_at<Buf>(
|
||||
&self,
|
||||
buf: tokio_epoll_uring::Slice<Buf>,
|
||||
offset: u64,
|
||||
@@ -937,23 +858,11 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
/// The function aborts the process if the error is fatal.
|
||||
async fn write_at<B: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
|
||||
let result = result.maybe_fatal_err("write_at");
|
||||
(slice, result)
|
||||
}
|
||||
|
||||
async fn write_at_inner<B: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
@@ -962,30 +871,13 @@ impl VirtualFileInner {
|
||||
observe_duration!(StorageIoOperation::Write, {
|
||||
let ((_file_guard, buf), result) =
|
||||
io_engine::get().write_at(file_guard, offset, buf).await;
|
||||
let result = result.maybe_fatal_err("write_at");
|
||||
if let Ok(size) = result {
|
||||
ctx.io_size_metrics().write.add(size.into_u64());
|
||||
}
|
||||
(buf, result)
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
let mut tmp = vec![0; 128];
|
||||
loop {
|
||||
let slice = tmp.slice(..128);
|
||||
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
|
||||
match res {
|
||||
Ok(0) => return Ok(()),
|
||||
Ok(n) => {
|
||||
self.pos += n as u64;
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
tmp = slice.into_inner();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
@@ -1200,19 +1092,6 @@ impl FileGuard {
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
|
||||
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&mut file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
@@ -1380,7 +1259,6 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::Write;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -1433,43 +1311,6 @@ mod tests {
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
|
||||
}
|
||||
}
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
|
||||
MaybeVirtualFile::File(file) => file.seek(pos),
|
||||
}
|
||||
}
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all(buf, ctx).await;
|
||||
res.map(|_| ())
|
||||
}
|
||||
MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to slurp contents of a file, starting at the current position,
|
||||
// into a string
|
||||
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
|
||||
use std::io::Read;
|
||||
let mut buf = String::new();
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let mut buf = Vec::new();
|
||||
file.read_to_end(&mut buf, ctx).await?;
|
||||
return Ok(String::from_utf8(buf).unwrap());
|
||||
}
|
||||
MaybeVirtualFile::File(file) => {
|
||||
file.read_to_string(&mut buf)?;
|
||||
}
|
||||
}
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(
|
||||
@@ -1565,48 +1406,23 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
file_a
|
||||
.write_all(b"foobar".to_vec().slice_len(), &ctx)
|
||||
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
let _ = file_a.read_string(&ctx).await.unwrap_err();
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
.write_all(b"bar".to_vec().slice_len(), &ctx)
|
||||
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", file_a.read_string(&ctx).await?);
|
||||
|
||||
// It's positioned at the EOF now.
|
||||
assert_eq!("", file_a.read_string(&ctx).await?);
|
||||
|
||||
// Test seeks.
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
|
||||
assert_eq!("ar", file_a.read_string(&ctx).await?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
|
||||
assert_eq!("bar", file_a.read_string(&ctx).await?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
|
||||
// Test erroneous seeks to before byte 0
|
||||
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
|
||||
|
||||
// the erroneous seek should have left the position unchanged
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Create another test file, and try FileExt functions on it.
|
||||
let path_b = testdir.join("file_b");
|
||||
@@ -1632,9 +1448,6 @@ mod tests {
|
||||
|
||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||
// open the same file many times. The effect is the same.)
|
||||
//
|
||||
// leave file_a positioned at offset 1 before we start
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
@@ -1644,7 +1457,7 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
|
||||
@@ -1652,8 +1465,8 @@ mod tests {
|
||||
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
|
||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||
// from it again. We left the file positioned at offset 1 above.
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
// from it again.
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
@@ -1747,7 +1560,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string(&ctx).await.unwrap();
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
@@ -1756,7 +1569,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string(&ctx).await.unwrap();
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "bar");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
@@ -1781,7 +1594,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string(&ctx).await.unwrap();
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
@@ -209,6 +209,22 @@ impl IoEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn set_len(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
len: u64,
|
||||
) -> (FileGuard, std::io::Result<()>) {
|
||||
match self {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
// TODO: ftruncate op for tokio-epoll-uring
|
||||
IoEngine::StdFs | IoEngine::TokioEpollUring => {
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn write_at<B: IoBuf + Send>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
|
||||
@@ -282,6 +282,17 @@ unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use tokio_epoll_uring::{IoBuf, IoBufMut};
|
||||
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, PageWriteGuardBuf};
|
||||
|
||||
/// A marker trait for a mutable aligned buffer type.
|
||||
pub trait IoBufAlignedMut: IoBufMut {}
|
||||
pub trait IoBufAlignedMut: IoBufMut {
|
||||
const ALIGN: usize = virtual_file::get_io_buffer_alignment();
|
||||
}
|
||||
|
||||
/// A marker trait for an aligned buffer type.
|
||||
pub trait IoBufAligned: IoBuf {}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod flush;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::BufMut;
|
||||
pub(crate) use flush::FlushControl;
|
||||
use flush::FlushHandle;
|
||||
pub(crate) use flush::FlushTaskError;
|
||||
@@ -8,6 +9,7 @@ use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use super::io_buf_aligned::IoBufAligned;
|
||||
use super::io_buf_aligned::IoBufAlignedMut;
|
||||
use super::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::context::RequestContext;
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut};
|
||||
@@ -64,7 +66,7 @@ pub struct BufferedWriter<B: Buffer, W> {
|
||||
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
|
||||
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
|
||||
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
|
||||
{
|
||||
@@ -73,6 +75,7 @@ where
|
||||
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
|
||||
pub fn new(
|
||||
writer: Arc<W>,
|
||||
start_offset: u64,
|
||||
buf_new: impl Fn() -> B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
cancel: CancellationToken,
|
||||
@@ -91,7 +94,7 @@ where
|
||||
ctx.attached_child(),
|
||||
flush_task_span,
|
||||
),
|
||||
bytes_submitted: 0,
|
||||
bytes_submitted: start_offset,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,21 +119,29 @@ where
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn flush_and_into_inner(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u64, Arc<W>), FlushTaskError> {
|
||||
self.flush(ctx).await?;
|
||||
|
||||
pub async fn shutdown(
|
||||
self,
|
||||
mut handle_tail: impl FnMut(B) -> Option<B>,
|
||||
) -> Result<(u64, W), FlushTaskError> {
|
||||
let Self {
|
||||
mutable: buf,
|
||||
mutable: tail,
|
||||
maybe_flushed: _,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_submitted: bytes_amount,
|
||||
bytes_submitted: submit_offset,
|
||||
} = self;
|
||||
flush_handle.shutdown().await?;
|
||||
assert!(buf.is_some());
|
||||
|
||||
let ctx = flush_handle.shutdown().await?;
|
||||
let buf = tail.expect("must not use after an error");
|
||||
let writer = Arc::into_inner(writer).expect("writer is the only strong reference");
|
||||
let mut bytes_amount = submit_offset;
|
||||
if let Some(buf) = handle_tail(buf) {
|
||||
bytes_amount += buf.pending() as u64;
|
||||
// TODO: infinite retries + maybe_fatal_err like we do in the flush loop; can we just send this
|
||||
// as work into the flush loop, as part of flush_handle.shutdown()
|
||||
let (_, res) = writer.write_all_at(buf.flush(), submit_offset, &ctx).await;
|
||||
let _: () = res.unwrap(); // DO NOT MERGE, THIS CAN FAIL E.G. on ENOSPC
|
||||
}
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
|
||||
@@ -235,6 +246,10 @@ pub trait Buffer {
|
||||
/// panics if `other.len() > self.cap() - self.pending()`.
|
||||
fn extend_from_slice(&mut self, other: &[u8]);
|
||||
|
||||
/// Add `count` bytes `val` into `self`.
|
||||
/// Panics if `count > self.cap() - self.pending()`.
|
||||
fn extend_with(&mut self, val: u8, count: usize);
|
||||
|
||||
/// Number of bytes in the buffer.
|
||||
fn pending(&self) -> usize;
|
||||
|
||||
@@ -262,6 +277,14 @@ impl Buffer for IoBufferMut {
|
||||
IoBufferMut::extend_from_slice(self, other);
|
||||
}
|
||||
|
||||
fn extend_with(&mut self, val: u8, count: usize) {
|
||||
if self.len() + count > self.cap() {
|
||||
panic!("Buffer capacity exceeded");
|
||||
}
|
||||
|
||||
IoBufferMut::put_bytes(self, val, count);
|
||||
}
|
||||
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
@@ -334,6 +357,7 @@ mod tests {
|
||||
let cancel = CancellationToken::new();
|
||||
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
|
||||
recorder,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(2),
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
@@ -350,7 +374,7 @@ mod tests {
|
||||
writer.write_buffered_borrowed(b"j", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"klmno", ctx).await?;
|
||||
|
||||
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
|
||||
let (_, recorder) = writer.shutdown(Some).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
{
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info, info_span, warn};
|
||||
@@ -21,7 +21,10 @@ pub struct FlushHandleInner<Buf, W> {
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
|
||||
join_handle: tokio::task::JoinHandle<Result<RequestContext, FlushTaskError>>,
|
||||
|
||||
// TODO: get rit of the type parameter?
|
||||
_phantom: PhantomData<W>,
|
||||
}
|
||||
|
||||
struct FlushRequest<Buf> {
|
||||
@@ -144,6 +147,7 @@ where
|
||||
inner: Some(FlushHandleInner {
|
||||
channel: front,
|
||||
join_handle,
|
||||
_phantom: PhantomData,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -179,12 +183,13 @@ where
|
||||
Err(self
|
||||
.shutdown()
|
||||
.await
|
||||
.expect_err("flush task only disconnects duplex if it exits with an error"))
|
||||
.err()
|
||||
.expect("flush task only disconnects duplex if it exits with an error"))
|
||||
}
|
||||
|
||||
/// Cleans up the channel, join the flush task.
|
||||
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
|
||||
let handle = self
|
||||
pub async fn shutdown(&mut self) -> Result<RequestContext, FlushTaskError> {
|
||||
let handle: FlushHandleInner<Buf, W> = self
|
||||
.inner
|
||||
.take()
|
||||
.expect("must not use after we returned an error");
|
||||
@@ -243,7 +248,7 @@ where
|
||||
}
|
||||
|
||||
/// Runs the background flush task.
|
||||
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
|
||||
async fn run(mut self) -> Result<RequestContext, FlushTaskError> {
|
||||
// Exit condition: channel is closed and there is no remaining buffer to be flushed
|
||||
while let Some(request) = self.channel.recv().await {
|
||||
#[cfg(test)]
|
||||
@@ -313,8 +318,7 @@ where
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.writer)
|
||||
Ok(self.ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,7 +353,7 @@ impl FlushNotStarted {
|
||||
impl FlushInProgress {
|
||||
/// Waits until background flush is done.
|
||||
pub async fn wait_until_flush_is_done(self) -> FlushDone {
|
||||
self.done_flush_rx.await.unwrap();
|
||||
let _ = self.done_flush_rx.await;
|
||||
FlushDone
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user