diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index a847589279..7ed9c3cc11 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -183,10 +183,10 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral dst: tokio_epoll_uring::Slice, ctx: &RequestContext, ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { - let submitted_offset = self.buffered_writer.submit_offset(); + let submitted_offset = self.buffered_writer.bytes_submitted(); - let tail = self.buffered_writer.inspect_tail(); - let tail = &tail[0..tail.pending()]; + let mutable = self.buffered_writer.inspect_mutable(); + let mutable = &mutable[0..mutable.pending()]; let maybe_flushed = self.buffered_writer.inspect_maybe_flushed(); @@ -216,7 +216,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral let (written_range, maybe_flushed_range) = { if maybe_flushed.is_some() { - // [ written ][ maybe_flushed ][ tail ] + // [ written ][ maybe_flushed ][ mutable ] // <- TAIL_SZ -><- TAIL_SZ -> // ^ // `submitted_offset` @@ -232,7 +232,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral ), ) } else { - // [ written ][ tail ] + // [ written ][ mutable ] // <- TAIL_SZ -> // ^ // `submitted_offset` @@ -290,7 +290,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral .unwrap() .into_usize(); let to_copy = - &tail[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())]; + &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())]; let bounds = dst.bounds(); let mut view = dst.slice({ let start = @@ -403,9 +403,9 @@ mod tests { .await .unwrap(); - let tail = file.buffered_writer.inspect_tail(); - let cap = tail.capacity(); - let align = tail.align(); + let mutable = file.buffered_writer.inspect_mutable(); + let cap = mutable.capacity(); + let align = mutable.align(); let write_nbytes = cap * 2 + cap / 2; @@ -446,8 +446,8 @@ mod tests { let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]); - let tail_buffer_contents = file.buffered_writer.inspect_tail(); - assert_eq!(tail_buffer_contents, &content[cap * 2..write_nbytes]); + let mutable_buffer_contents = file.buffered_writer.inspect_mutable(); + assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]); } #[tokio::test] @@ -461,7 +461,7 @@ mod tests { .unwrap(); // mutable buffer and maybe_flushed buffer each has `cap` bytes. - let cap = file.buffered_writer.inspect_tail().capacity(); + let cap = file.buffered_writer.inspect_mutable().capacity(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) @@ -486,7 +486,7 @@ mod tests { &content[cap..cap * 2] ); assert_eq!( - &file.buffered_writer.inspect_tail()[0..cap / 2], + &file.buffered_writer.inspect_mutable()[0..cap / 2], &content[cap * 2..cap * 2 + cap / 2] ); } @@ -507,7 +507,7 @@ mod tests { .await .unwrap(); - let mutable = file.buffered_writer.inspect_tail(); + let mutable = file.buffered_writer.inspect_mutable(); let cap = mutable.capacity(); let align = mutable.align(); let content: Vec = rand::thread_rng() diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 102e3d0036..af408c97c3 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -47,11 +47,11 @@ pub struct BufferedWriter { /// - after an IO error => stays `None` forever /// /// In these exceptional cases, it's `None`. - tail: Option, + mutable: Option, /// A handle to the background flush task for writting data to disk. flush_handle: FlushHandle, - /// The next offset to be submitted to the background task. - submit_offset: u64, + /// The number of bytes submitted to the background task. + bytes_submitted: u64, } impl BufferedWriter @@ -73,7 +73,7 @@ where ) -> Self { Self { writer: writer.clone(), - tail: Some(buf_new()), + mutable: Some(buf_new()), flush_handle: FlushHandle::spawn_new( writer, buf_new(), @@ -81,7 +81,7 @@ where ctx.attached_child(), flush_task_span, ), - submit_offset: start_offset, + bytes_submitted: start_offset, } } @@ -90,13 +90,13 @@ where } /// Returns the number of bytes submitted to the background flush task. - pub fn submit_offset(&self) -> u64 { - self.submit_offset + pub fn bytes_submitted(&self) -> u64 { + self.bytes_submitted } /// Panics if used after any of the write paths returned an error - pub fn inspect_tail(&self) -> &B { - self.tail() + pub fn inspect_mutable(&self) -> &B { + self.mutable() } /// Gets a reference to the maybe flushed read-only buffer. @@ -111,10 +111,10 @@ where mut handle_tail: impl FnMut(B) -> Option, ) -> std::io::Result<(u64, W)> { let Self { - tail, + mutable: tail, writer, mut flush_handle, - submit_offset, + bytes_submitted: submit_offset, } = self; let ctx = flush_handle.shutdown().await?; @@ -129,22 +129,14 @@ where Ok((bytes_amount, writer)) } - /// Gets a immutable reference to the tail in-memory buffer. + /// Gets a reference to the mutable in-memory buffer. #[inline(always)] - fn tail(&self) -> &B { - self.tail + fn mutable(&self) -> &B { + self.mutable .as_ref() .expect("must not use after we returned an error") } - /// Gets a mutable reference to the tail in-memory buffer. - #[inline(always)] - fn tail_mut(&mut self) -> &mut B { - self.tail - .as_mut() - .expect("must not use after we returned an error") - } - #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn write_buffered_borrowed( &mut self, @@ -167,7 +159,7 @@ where let chunk_len = chunk.len(); let mut control: Option = None; while !chunk.is_empty() { - let buf = self.tail_mut(); + let buf = self.mutable.as_mut().expect("must not use after an error"); let need = buf.cap() - buf.pending(); let have = chunk.len(); let n = std::cmp::min(need, have); @@ -186,15 +178,15 @@ where #[must_use = "caller must explcitly check the flush control"] async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result> { - let buf = self.tail.take().expect("must not use after an error"); + let buf = self.mutable.take().expect("must not use after an error"); let buf_len = buf.pending(); if buf_len == 0 { - self.tail = Some(buf); + self.mutable = Some(buf); return Ok(None); } - let (recycled, flush_control) = self.flush_handle.flush(buf, self.submit_offset).await?; - self.submit_offset += u64::try_from(buf_len).unwrap(); - self.tail = Some(recycled); + let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?; + self.bytes_submitted += u64::try_from(buf_len).unwrap(); + self.mutable = Some(recycled); Ok(Some(flush_control)) } }