mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -167,7 +167,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let flushed_offset = self.buffered_writer.bytes_written();
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
|
||||
let mutable = self.buffered_writer.inspect_mutable();
|
||||
let mutable = &mutable[0..mutable.pending()];
|
||||
@@ -208,23 +208,23 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
(
|
||||
Range(
|
||||
start,
|
||||
std::cmp::min(end, flushed_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
),
|
||||
Range(
|
||||
std::cmp::max(start, flushed_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
std::cmp::min(end, flushed_offset),
|
||||
std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
std::cmp::min(end, submitted_offset),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
Range(start, std::cmp::min(end, flushed_offset)),
|
||||
Range(start, std::cmp::min(end, submitted_offset)),
|
||||
// zero len
|
||||
Range(flushed_offset, u64::MIN),
|
||||
Range(submitted_offset, u64::MIN),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let mutable_range = Range(std::cmp::max(start, flushed_offset), end);
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let file: &VirtualFile = self.buffered_writer.as_inner();
|
||||
@@ -240,7 +240,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
let dst = if maybe_flushed_range.len() > 0 {
|
||||
let offset_in_buffer = maybe_flushed_range
|
||||
.0
|
||||
.checked_sub(flushed_offset.saturating_sub(TAIL_SZ as u64))
|
||||
.checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
// Checked previously the buffer is Some.
|
||||
@@ -265,7 +265,7 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
let dst = if mutable_range.len() > 0 {
|
||||
let offset_in_buffer = mutable_range
|
||||
.0
|
||||
.checked_sub(flushed_offset)
|
||||
.checked_sub(submitted_offset)
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
|
||||
@@ -53,7 +53,7 @@ pub struct BufferedWriter<B: Buffer, W> {
|
||||
/// A handle to the background flush task for writting data to disk.
|
||||
flush_handle: FlushHandle<B::IoBuf, W>,
|
||||
/// The number of bytes submitted to the background task.
|
||||
bytes_amount: u64,
|
||||
bytes_submitted: u64,
|
||||
}
|
||||
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
@@ -70,7 +70,7 @@ where
|
||||
writer: writer.clone(),
|
||||
mutable: Some(buf_new()),
|
||||
flush_handle: FlushHandle::spawn_new(writer, buf_new(), ctx.attached_child()),
|
||||
bytes_amount: 0,
|
||||
bytes_submitted: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,8 +79,8 @@ where
|
||||
}
|
||||
|
||||
/// Returns the number of bytes submitted to the background flush task.
|
||||
pub fn bytes_written(&self) -> u64 {
|
||||
self.bytes_amount
|
||||
pub fn bytes_submitted(&self) -> u64 {
|
||||
self.bytes_submitted
|
||||
}
|
||||
|
||||
/// Panics if used after any of the write paths returned an error
|
||||
@@ -105,7 +105,7 @@ where
|
||||
mutable: buf,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_amount,
|
||||
bytes_submitted: bytes_amount,
|
||||
} = self;
|
||||
flush_handle.shutdown().await?;
|
||||
assert!(buf.is_some());
|
||||
@@ -145,11 +145,11 @@ where
|
||||
let chunk = OwnedAsyncWriter::write_all_at(
|
||||
self.writer.as_ref(),
|
||||
FullSlice::must_new(chunk),
|
||||
self.bytes_amount,
|
||||
self.bytes_submitted,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
self.bytes_amount += u64::try_from(chunk_len).unwrap();
|
||||
self.bytes_submitted += u64::try_from(chunk_len).unwrap();
|
||||
return Ok((chunk_len, chunk));
|
||||
}
|
||||
// in-memory copy the < BUFFER_SIZED tail of the chunk
|
||||
@@ -210,9 +210,9 @@ where
|
||||
}
|
||||
let recycled = self
|
||||
.flush_handle
|
||||
.flush(buf, self.bytes_amount, save_buf_for_read)
|
||||
.flush(buf, self.bytes_submitted, save_buf_for_read)
|
||||
.await?;
|
||||
self.bytes_amount += u64::try_from(buf_len).unwrap();
|
||||
self.bytes_submitted += u64::try_from(buf_len).unwrap();
|
||||
self.mutable = Some(recycled);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user