mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 16:40:38 +00:00
Compare commits
4 Commits
problame/w
...
problame/w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b10701a0f | ||
|
|
b937432a04 | ||
|
|
a6d414d6e6 | ||
|
|
0f7f743f37 |
@@ -17,12 +17,12 @@ pub struct EphemeralFile {
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
|
||||
sandwich: page_caching_sandwich::PageCachingSandwich,
|
||||
rw: page_caching::RW,
|
||||
}
|
||||
|
||||
mod page_caching_sandwich;
|
||||
mod sandwich;
|
||||
mod page_caching;
|
||||
mod zero_padded_buffer;
|
||||
mod zero_padded_read_write;
|
||||
|
||||
impl EphemeralFile {
|
||||
pub async fn create(
|
||||
@@ -52,16 +52,16 @@ impl EphemeralFile {
|
||||
Ok(EphemeralFile {
|
||||
_tenant_shard_id: tenant_shard_id,
|
||||
_timeline_id: timeline_id,
|
||||
sandwich: page_caching_sandwich::PageCachingSandwich::new(file),
|
||||
rw: page_caching::RW::new(file),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> u64 {
|
||||
self.sandwich.bytes_written()
|
||||
self.rw.bytes_written()
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
|
||||
self.sandwich.page_cache_file_id()
|
||||
self.rw.page_cache_file_id()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -69,7 +69,7 @@ impl EphemeralFile {
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
self.sandwich.read_blk(blknum, ctx).await
|
||||
self.rw.read_blk(blknum, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_blob(
|
||||
@@ -77,22 +77,22 @@ impl EphemeralFile {
|
||||
srcbuf: &[u8],
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<u64, io::Error> {
|
||||
let pos = self.sandwich.bytes_written();
|
||||
let pos = self.rw.bytes_written();
|
||||
|
||||
// Write the length field
|
||||
if srcbuf.len() < 0x80 {
|
||||
// short one-byte length header
|
||||
let len_buf = [srcbuf.len() as u8];
|
||||
|
||||
self.sandwich.write_all_borrowed(&len_buf).await?;
|
||||
self.rw.write_all_borrowed(&len_buf).await?;
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
self.sandwich.write_all_borrowed(&len_buf).await?;
|
||||
self.rw.write_all_borrowed(&len_buf).await?;
|
||||
}
|
||||
|
||||
// Write the payload
|
||||
self.sandwich.write_all_borrowed(srcbuf).await?;
|
||||
self.rw.write_all_borrowed(srcbuf).await?;
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
//! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
|
||||
//! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::BlockLease;
|
||||
@@ -6,18 +9,19 @@ use crate::virtual_file::VirtualFile;
|
||||
use std::io;
|
||||
use tracing::*;
|
||||
|
||||
use super::sandwich;
|
||||
use super::zero_padded_read_write;
|
||||
|
||||
pub struct PageCachingSandwich {
|
||||
/// See module-level comment.
|
||||
pub struct RW {
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
sandwich: super::sandwich::Sandwich,
|
||||
rw: super::zero_padded_read_write::RW,
|
||||
}
|
||||
|
||||
impl PageCachingSandwich {
|
||||
impl RW {
|
||||
pub fn new(file: VirtualFile) -> Self {
|
||||
Self {
|
||||
page_cache_file_id: page_cache::next_file_id(),
|
||||
sandwich: super::sandwich::Sandwich::new(file),
|
||||
rw: super::zero_padded_read_write::RW::new(file),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,11 +32,11 @@ impl PageCachingSandwich {
|
||||
pub(crate) async fn write_all_borrowed(&mut self, srcbuf: &[u8]) -> Result<usize, io::Error> {
|
||||
// It doesn't make sense to proactively fill the page cache on the Pageserver write path
|
||||
// because Compute is unlikely to access recently written data.
|
||||
self.sandwich.write_all_borrowed(srcbuf).await
|
||||
self.rw.write_all_borrowed(srcbuf).await
|
||||
}
|
||||
|
||||
pub(crate) fn bytes_written(&self) -> u64 {
|
||||
self.sandwich.bytes_written()
|
||||
self.rw.bytes_written()
|
||||
}
|
||||
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -40,8 +44,8 @@ impl PageCachingSandwich {
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, io::Error> {
|
||||
match self.sandwich.read_blk(blknum).await? {
|
||||
sandwich::ReadResult::NeedsReadFromVirtualFile { virtual_file } => {
|
||||
match self.rw.read_blk(blknum).await? {
|
||||
zero_padded_read_write::ReadResult::NeedsReadFromVirtualFile { virtual_file } => {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
|
||||
@@ -53,7 +57,7 @@ impl PageCachingSandwich {
|
||||
format!(
|
||||
"ephemeral file: read immutable page #{}: {}: {:#}",
|
||||
blknum,
|
||||
self.sandwich.as_inner_virtual_file().path,
|
||||
self.rw.as_inner_virtual_file().path,
|
||||
e,
|
||||
),
|
||||
)
|
||||
@@ -70,20 +74,20 @@ impl PageCachingSandwich {
|
||||
}
|
||||
}
|
||||
}
|
||||
sandwich::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
|
||||
zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
|
||||
Ok(BlockLease::EphemeralFileMutableTail(buffer))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageCachingSandwich {
|
||||
impl Drop for RW {
|
||||
fn drop(&mut self) {
|
||||
// There might still be pages in the [`crate::page_cache`] for this file.
|
||||
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
|
||||
|
||||
// unlink the file
|
||||
let res = std::fs::remove_file(&self.sandwich.as_inner_virtual_file().path);
|
||||
let res = std::fs::remove_file(&self.rw.as_inner_virtual_file().path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
@@ -92,7 +96,7 @@ impl Drop for PageCachingSandwich {
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!(
|
||||
"could not remove ephemeral file '{}': {}",
|
||||
self.sandwich.as_inner_virtual_file().path,
|
||||
self.rw.as_inner_virtual_file().path,
|
||||
e
|
||||
);
|
||||
}
|
||||
@@ -1,3 +1,23 @@
|
||||
//! The heart of how [`super::EphemeralFile`] does its reads and writes.
|
||||
//!
|
||||
//! # Writes
|
||||
//!
|
||||
//! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`].
|
||||
//! The [`RW`] batches these into [`RW::TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`].
|
||||
//!
|
||||
//! # Reads
|
||||
//!
|
||||
//! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`].
|
||||
//!
|
||||
//! The [`RW`] serves these reads either from the buffered writer's in-memory buffer
|
||||
//! or redirects the caller to read from the underlying [`VirtualFile`]` if they have already
|
||||
//! been flushed.
|
||||
//!
|
||||
//! The current caller is [`super::page_caching::RW`]. In case it gets redirected to read from
|
||||
//! [`VirtualFile`], it consults the [`crate::page_cache`] first.
|
||||
|
||||
mod zero_padded_buffer;
|
||||
|
||||
use crate::{
|
||||
page_cache::PAGE_SZ,
|
||||
tenant::ephemeral_file::zero_padded_buffer,
|
||||
@@ -7,7 +27,8 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
pub struct Sandwich {
|
||||
/// See module-level comment.
|
||||
pub struct RW {
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<
|
||||
zero_padded_buffer::Buf<{ Self::TAIL_SZ }>,
|
||||
owned_buffers_io::util::size_tracking_writer::Writer<VirtualFile>,
|
||||
@@ -19,7 +40,7 @@ pub enum ReadResult<'a> {
|
||||
ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] },
|
||||
}
|
||||
|
||||
impl Sandwich {
|
||||
impl RW {
|
||||
const TAIL_SZ: usize = 64 * 1024;
|
||||
|
||||
pub fn new(file: VirtualFile) -> Self {
|
||||
@@ -53,10 +74,14 @@ impl Sandwich {
|
||||
let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap();
|
||||
let read_offset = (blknum as u64) * (PAGE_SZ as u64);
|
||||
|
||||
assert_eq!(
|
||||
flushed_offset % (Self::TAIL_SZ as u64), 0,
|
||||
"we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
|
||||
);
|
||||
assert_eq!(
|
||||
flushed_offset % (PAGE_SZ as u64),
|
||||
0,
|
||||
"we need this in the logic below, because it assumes the page isn't spread across flushed part and in-memory buffer"
|
||||
"the logic below can't handle if the page is spread across the flushed part and the buffer"
|
||||
);
|
||||
|
||||
if read_offset < flushed_offset {
|
||||
@@ -60,11 +60,11 @@ impl<const N: usize> crate::virtual_file::owned_buffers_io::write::Buffer for Bu
|
||||
tokio_epoll_uring::BoundedBuf::slice(self, 0..written)
|
||||
}
|
||||
|
||||
fn reconstruct_after_flush(this: Self::IoBuf) -> Self {
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
|
||||
let Self {
|
||||
mut allocation,
|
||||
written,
|
||||
} = this;
|
||||
} = iobuf;
|
||||
allocation[0..written].fill(0);
|
||||
let new = Self {
|
||||
allocation,
|
||||
@@ -10,55 +10,14 @@ pub trait OwnedAsyncWriter {
|
||||
) -> std::io::Result<(usize, B::Buf)>;
|
||||
}
|
||||
|
||||
pub trait Buffer {
|
||||
type IoBuf: IoBuf;
|
||||
|
||||
fn cap(&self) -> usize;
|
||||
fn extend_from_slice(&mut self, other: &[u8]);
|
||||
fn pending(&self) -> usize;
|
||||
fn flush(self) -> Slice<Self::IoBuf>;
|
||||
fn reconstruct_after_flush(iobuf: Self::IoBuf) -> Self;
|
||||
}
|
||||
|
||||
impl Buffer for BytesMut {
|
||||
type IoBuf = BytesMut;
|
||||
|
||||
#[inline(always)]
|
||||
fn cap(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
BytesMut::extend_from_slice(self, other)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn flush(self) -> Slice<BytesMut> {
|
||||
if self.is_empty() {
|
||||
return self.slice_full();
|
||||
}
|
||||
let len = self.len();
|
||||
self.slice(0..len)
|
||||
}
|
||||
|
||||
fn reconstruct_after_flush(mut iobuf: BytesMut) -> Self {
|
||||
iobuf.clear();
|
||||
iobuf
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers
|
||||
/// into `BUFFER_SIZE`-sized writes.
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
/// small writes into larger writes of size [`Buffer::cap`].
|
||||
///
|
||||
/// # Passthrough Of Large Writers
|
||||
///
|
||||
/// Buffered writes larger than the `BUFFER_SIZE` cause the internal
|
||||
/// buffer to be flushed, even if it is not full yet. Then, the large
|
||||
/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`].
|
||||
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
|
||||
/// cause the internal buffer to be flushed prematurely so that the large
|
||||
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
|
||||
///
|
||||
/// This pass-through is generally beneficial for throughput, but if
|
||||
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
|
||||
@@ -68,11 +27,10 @@ impl Buffer for BytesMut {
|
||||
/// may be preferable.
|
||||
pub struct BufferedWriter<B, W> {
|
||||
writer: W,
|
||||
// invariant: always remains Some(buf)
|
||||
// with buf.capacity() == BUFFER_SIZE except
|
||||
// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
// - after an IO error => stays `None` forever
|
||||
// In these exceptional cases, it's `None`.
|
||||
/// invariant: always remains Some(buf) except
|
||||
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
/// - after an IO error => stays `None` forever
|
||||
/// In these exceptional cases, it's `None`.
|
||||
buf: Option<B>,
|
||||
}
|
||||
|
||||
@@ -182,11 +140,66 @@ where
|
||||
}
|
||||
let (nwritten, io_buf) = self.writer.write_all(buf.flush()).await?;
|
||||
assert_eq!(nwritten, buf_len);
|
||||
self.buf = Some(Buffer::reconstruct_after_flush(io_buf));
|
||||
self.buf = Some(Buffer::reuse_after_flush(io_buf));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
|
||||
pub trait Buffer {
|
||||
type IoBuf: IoBuf;
|
||||
|
||||
/// Capacity of the buffer. Must not change over the lifetime `self`.`
|
||||
fn cap(&self) -> usize;
|
||||
|
||||
/// Add data to the buffer.
|
||||
/// Panics if there is not enough room to accomodate `other`'s content, i.e.,
|
||||
/// panics if `other.len() > self.cap() - self.pending()`.
|
||||
fn extend_from_slice(&mut self, other: &[u8]);
|
||||
|
||||
/// Number of bytes in the buffer.
|
||||
fn pending(&self) -> usize;
|
||||
|
||||
/// Turns `self` into a [`tokio_epoll_uring::Slice`] of the pending data
|
||||
/// so we can use [`tokio_epoll_uring`] to write it to disk.
|
||||
fn flush(self) -> Slice<Self::IoBuf>;
|
||||
|
||||
/// After the write to disk is done and we have gotten back the slice,
|
||||
/// [`BufferedWriter`] uses this method to re-use the io buffer.
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
|
||||
}
|
||||
|
||||
impl Buffer for BytesMut {
|
||||
type IoBuf = BytesMut;
|
||||
|
||||
#[inline(always)]
|
||||
fn cap(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
BytesMut::extend_from_slice(self, other)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn flush(self) -> Slice<BytesMut> {
|
||||
if self.is_empty() {
|
||||
return self.slice_full();
|
||||
}
|
||||
let len = self.len();
|
||||
self.slice(0..len)
|
||||
}
|
||||
|
||||
fn reuse_after_flush(mut iobuf: BytesMut) -> Self {
|
||||
iobuf.clear();
|
||||
iobuf
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for Vec<u8> {
|
||||
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
|
||||
Reference in New Issue
Block a user