REVIEW: undo the mutable->tail rename to minimize conflicts with next commit

Changes to be committed:
	modified:   pageserver/src/tenant/ephemeral_file.rs
	modified:   pageserver/src/virtual_file/owned_buffers_io/write.rs
This commit is contained in:
Christian Schwarz
2025-04-10 09:02:22 +02:00
parent dd3178836d
commit 6f25c976f6
2 changed files with 34 additions and 42 deletions

View File

@@ -183,10 +183,10 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst: tokio_epoll_uring::Slice<B>,
ctx: &RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let submitted_offset = self.buffered_writer.submit_offset();
let submitted_offset = self.buffered_writer.bytes_submitted();
let tail = self.buffered_writer.inspect_tail();
let tail = &tail[0..tail.pending()];
let mutable = self.buffered_writer.inspect_mutable();
let mutable = &mutable[0..mutable.pending()];
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
@@ -216,7 +216,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let (written_range, maybe_flushed_range) = {
if maybe_flushed.is_some() {
// [ written ][ maybe_flushed ][ tail ]
// [ written ][ maybe_flushed ][ mutable ]
// <- TAIL_SZ -><- TAIL_SZ ->
// ^
// `submitted_offset`
@@ -232,7 +232,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
),
)
} else {
// [ written ][ tail ]
// [ written ][ mutable ]
// <- TAIL_SZ ->
// ^
// `submitted_offset`
@@ -290,7 +290,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
.unwrap()
.into_usize();
let to_copy =
&tail[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let bounds = dst.bounds();
let mut view = dst.slice({
let start =
@@ -403,9 +403,9 @@ mod tests {
.await
.unwrap();
let tail = file.buffered_writer.inspect_tail();
let cap = tail.capacity();
let align = tail.align();
let mutable = file.buffered_writer.inspect_mutable();
let cap = mutable.capacity();
let align = mutable.align();
let write_nbytes = cap * 2 + cap / 2;
@@ -446,8 +446,8 @@ mod tests {
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
let tail_buffer_contents = file.buffered_writer.inspect_tail();
assert_eq!(tail_buffer_contents, &content[cap * 2..write_nbytes]);
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}
#[tokio::test]
@@ -461,7 +461,7 @@ mod tests {
.unwrap();
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let cap = file.buffered_writer.inspect_tail().capacity();
let cap = file.buffered_writer.inspect_mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -486,7 +486,7 @@ mod tests {
&content[cap..cap * 2]
);
assert_eq!(
&file.buffered_writer.inspect_tail()[0..cap / 2],
&file.buffered_writer.inspect_mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}
@@ -507,7 +507,7 @@ mod tests {
.await
.unwrap();
let mutable = file.buffered_writer.inspect_tail();
let mutable = file.buffered_writer.inspect_mutable();
let cap = mutable.capacity();
let align = mutable.align();
let content: Vec<u8> = rand::thread_rng()

View File

@@ -47,11 +47,11 @@ pub struct BufferedWriter<B: Buffer, W> {
/// - after an IO error => stays `None` forever
///
/// In these exceptional cases, it's `None`.
tail: Option<B>,
mutable: Option<B>,
/// A handle to the background flush task for writting data to disk.
flush_handle: FlushHandle<B::IoBuf, W>,
/// The next offset to be submitted to the background task.
submit_offset: u64,
/// The number of bytes submitted to the background task.
bytes_submitted: u64,
}
impl<B, Buf, W> BufferedWriter<B, W>
@@ -73,7 +73,7 @@ where
) -> Self {
Self {
writer: writer.clone(),
tail: Some(buf_new()),
mutable: Some(buf_new()),
flush_handle: FlushHandle::spawn_new(
writer,
buf_new(),
@@ -81,7 +81,7 @@ where
ctx.attached_child(),
flush_task_span,
),
submit_offset: start_offset,
bytes_submitted: start_offset,
}
}
@@ -90,13 +90,13 @@ where
}
/// Returns the number of bytes submitted to the background flush task.
pub fn submit_offset(&self) -> u64 {
self.submit_offset
pub fn bytes_submitted(&self) -> u64 {
self.bytes_submitted
}
/// Panics if used after any of the write paths returned an error
pub fn inspect_tail(&self) -> &B {
self.tail()
pub fn inspect_mutable(&self) -> &B {
self.mutable()
}
/// Gets a reference to the maybe flushed read-only buffer.
@@ -111,10 +111,10 @@ where
mut handle_tail: impl FnMut(B) -> Option<B>,
) -> std::io::Result<(u64, W)> {
let Self {
tail,
mutable: tail,
writer,
mut flush_handle,
submit_offset,
bytes_submitted: submit_offset,
} = self;
let ctx = flush_handle.shutdown().await?;
@@ -129,22 +129,14 @@ where
Ok((bytes_amount, writer))
}
/// Gets a immutable reference to the tail in-memory buffer.
/// Gets a reference to the mutable in-memory buffer.
#[inline(always)]
fn tail(&self) -> &B {
self.tail
fn mutable(&self) -> &B {
self.mutable
.as_ref()
.expect("must not use after we returned an error")
}
/// Gets a mutable reference to the tail in-memory buffer.
#[inline(always)]
fn tail_mut(&mut self) -> &mut B {
self.tail
.as_mut()
.expect("must not use after we returned an error")
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered_borrowed(
&mut self,
@@ -167,7 +159,7 @@ where
let chunk_len = chunk.len();
let mut control: Option<FlushControl> = None;
while !chunk.is_empty() {
let buf = self.tail_mut();
let buf = self.mutable.as_mut().expect("must not use after an error");
let need = buf.cap() - buf.pending();
let have = chunk.len();
let n = std::cmp::min(need, have);
@@ -186,15 +178,15 @@ where
#[must_use = "caller must explcitly check the flush control"]
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<Option<FlushControl>> {
let buf = self.tail.take().expect("must not use after an error");
let buf = self.mutable.take().expect("must not use after an error");
let buf_len = buf.pending();
if buf_len == 0 {
self.tail = Some(buf);
self.mutable = Some(buf);
return Ok(None);
}
let (recycled, flush_control) = self.flush_handle.flush(buf, self.submit_offset).await?;
self.submit_offset += u64::try_from(buf_len).unwrap();
self.tail = Some(recycled);
let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?;
self.bytes_submitted += u64::try_from(buf_len).unwrap();
self.mutable = Some(recycled);
Ok(Some(flush_control))
}
}