mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
rename end mutable buffer tail
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -183,8 +183,8 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let submitted_offset = self.buffered_writer.submit_offset();
|
||||
|
||||
let mutable = self.buffered_writer.inspect_mutable();
|
||||
let mutable = &mutable[0..mutable.pending()];
|
||||
let tail = self.buffered_writer.inspect_tail();
|
||||
let tail = &tail[0..tail.pending()];
|
||||
|
||||
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
|
||||
|
||||
@@ -214,7 +214,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
|
||||
let (written_range, maybe_flushed_range) = {
|
||||
if maybe_flushed.is_some() {
|
||||
// [ written ][ maybe_flushed ][ mutable ]
|
||||
// [ written ][ maybe_flushed ][ tail ]
|
||||
// <- TAIL_SZ -><- TAIL_SZ ->
|
||||
// ^
|
||||
// `submitted_offset`
|
||||
@@ -230,7 +230,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
),
|
||||
)
|
||||
} else {
|
||||
// [ written ][ mutable ]
|
||||
// [ written ][ tail ]
|
||||
// <- TAIL_SZ ->
|
||||
// ^
|
||||
// `submitted_offset`
|
||||
@@ -288,7 +288,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
|
||||
&tail[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
|
||||
let bounds = dst.bounds();
|
||||
let mut view = dst.slice({
|
||||
let start =
|
||||
@@ -399,9 +399,9 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.inspect_mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let tail = file.buffered_writer.inspect_tail();
|
||||
let cap = tail.capacity();
|
||||
let align = tail.align();
|
||||
|
||||
let write_nbytes = cap * 2 + cap / 2;
|
||||
|
||||
@@ -442,8 +442,8 @@ mod tests {
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
let tail_buffer_contents = file.buffered_writer.inspect_tail();
|
||||
assert_eq!(tail_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -457,7 +457,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
|
||||
let cap = file.buffered_writer.inspect_mutable().capacity();
|
||||
let cap = file.buffered_writer.inspect_tail().capacity();
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -482,7 +482,7 @@ mod tests {
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_mutable()[0..cap / 2],
|
||||
&file.buffered_writer.inspect_tail()[0..cap / 2],
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
);
|
||||
}
|
||||
@@ -503,7 +503,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.inspect_mutable();
|
||||
let mutable = file.buffered_writer.inspect_tail();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
|
||||
@@ -52,7 +52,7 @@ pub struct BufferedWriter<B: Buffer, W> {
|
||||
/// - after an IO error => stays `None` forever
|
||||
///
|
||||
/// In these exceptional cases, it's `None`.
|
||||
mutable: Option<B>,
|
||||
tail: Option<B>,
|
||||
/// A handle to the background flush task for writting data to disk.
|
||||
flush_handle: FlushHandle<B::IoBuf, W>,
|
||||
/// The next offset to be submitted to the background task.
|
||||
@@ -77,7 +77,7 @@ where
|
||||
) -> Self {
|
||||
Self {
|
||||
writer: writer.clone(),
|
||||
mutable: Some(buf_new()),
|
||||
tail: Some(buf_new()),
|
||||
flush_handle: FlushHandle::spawn_new(
|
||||
writer,
|
||||
buf_new(),
|
||||
@@ -98,8 +98,8 @@ where
|
||||
}
|
||||
|
||||
/// Panics if used after any of the write paths returned an error
|
||||
pub fn inspect_mutable(&self) -> &B {
|
||||
self.mutable()
|
||||
pub fn inspect_tail(&self) -> &B {
|
||||
self.tail()
|
||||
}
|
||||
|
||||
/// Gets a reference to the maybe flushed read-only buffer.
|
||||
@@ -110,7 +110,7 @@ where
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn shutdown(mut self, ctx: &RequestContext) -> std::io::Result<(u64, W)> {
|
||||
let buf = self.mutable_mut();
|
||||
let buf = self.tail_mut();
|
||||
let len = buf.pending();
|
||||
let cap = buf.cap();
|
||||
if len < cap {
|
||||
@@ -123,7 +123,7 @@ where
|
||||
}
|
||||
|
||||
let Self {
|
||||
mutable: buf,
|
||||
tail: buf,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
submit_offset: bytes_amount,
|
||||
@@ -137,7 +137,7 @@ where
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub fn shutdown_no_flush(self) -> Arc<W> {
|
||||
let Self {
|
||||
mutable: _,
|
||||
tail: _,
|
||||
writer,
|
||||
flush_handle,
|
||||
submit_offset: _,
|
||||
@@ -146,18 +146,18 @@ where
|
||||
writer
|
||||
}
|
||||
|
||||
/// Gets a immutable reference to the mutable in-memory buffer.
|
||||
/// Gets a immutable reference to the tail in-memory buffer.
|
||||
#[inline(always)]
|
||||
fn mutable(&self) -> &B {
|
||||
self.mutable
|
||||
fn tail(&self) -> &B {
|
||||
self.tail
|
||||
.as_ref()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
/// Gets a mutable reference to the mutable in-memory buffer.
|
||||
/// Gets a mutable reference to the tail in-memory buffer.
|
||||
#[inline(always)]
|
||||
fn mutable_mut(&mut self) -> &mut B {
|
||||
self.mutable
|
||||
fn tail_mut(&mut self) -> &mut B {
|
||||
self.tail
|
||||
.as_mut()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
@@ -184,7 +184,7 @@ where
|
||||
let chunk_len = chunk.len();
|
||||
let mut control: Option<FlushControl> = None;
|
||||
while !chunk.is_empty() {
|
||||
let buf = self.mutable_mut();
|
||||
let buf = self.tail_mut();
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = chunk.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
@@ -203,15 +203,15 @@ where
|
||||
|
||||
#[must_use = "caller must explcitly check the flush control"]
|
||||
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<Option<FlushControl>> {
|
||||
let buf = self.mutable.take().expect("must not use after an error");
|
||||
let buf = self.tail.take().expect("must not use after an error");
|
||||
let buf_len = buf.pending();
|
||||
if buf_len == 0 {
|
||||
self.mutable = Some(buf);
|
||||
self.tail = Some(buf);
|
||||
return Ok(None);
|
||||
}
|
||||
let (recycled, flush_control) = self.flush_handle.flush(buf, self.submit_offset).await?;
|
||||
self.submit_offset += u64::try_from(buf_len).unwrap();
|
||||
self.mutable = Some(recycled);
|
||||
self.tail = Some(recycled);
|
||||
Ok(Some(flush_control))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user