mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
remove write_buffered; add notes for bypass-aligned-part-of-write
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
@@ -122,72 +122,8 @@ where
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
/// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted.
|
||||
#[allow(dead_code)]
|
||||
pub async fn write_buffered<S: IoBufAligned + Send>(
|
||||
&mut self,
|
||||
chunk: FullSlice<S>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<S>)> {
|
||||
let chunk = chunk.into_raw_slice();
|
||||
|
||||
let chunk_len = chunk.len();
|
||||
// avoid memcpy for the middle of the chunk
|
||||
if chunk.len() >= self.mutable().cap() {
|
||||
// TODO(yuchen): do we still want to keep the bypass path?
|
||||
let control = self.flush(false, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
}
|
||||
// do a big write, bypassing `buf`
|
||||
assert_eq!(
|
||||
self.mutable
|
||||
.as_ref()
|
||||
.expect("must not use after an error")
|
||||
.pending(),
|
||||
0
|
||||
);
|
||||
let chunk = OwnedAsyncWriter::write_all_at(
|
||||
self.writer.as_ref(),
|
||||
FullSlice::must_new(chunk),
|
||||
self.bytes_submitted,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
self.bytes_submitted += u64::try_from(chunk_len).unwrap();
|
||||
return Ok((chunk_len, chunk));
|
||||
}
|
||||
// in-memory copy the < BUFFER_SIZED tail of the chunk
|
||||
assert!(chunk.len() < self.mutable().cap());
|
||||
let mut slice = &chunk[..];
|
||||
let mut control: Option<FlushControl> = None;
|
||||
while !slice.is_empty() {
|
||||
if let Some(control) = control.take() {
|
||||
control.release().await;
|
||||
}
|
||||
let buf = self.mutable.as_mut().expect("must not use after an error");
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = slice.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
slice = &slice[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
control = self.flush(true, ctx).await?;
|
||||
}
|
||||
}
|
||||
if let Some(control) = control.take() {
|
||||
control.release().await;
|
||||
}
|
||||
assert!(slice.is_empty(), "by now we should have drained the chunk");
|
||||
Ok((chunk_len, FullSlice::must_new(chunk)))
|
||||
}
|
||||
|
||||
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
|
||||
///
|
||||
/// It is less performant because we always have to copy the borrowed data into the internal buffer
|
||||
/// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
|
||||
/// for large writes.
|
||||
/// TODO(yuchen): For large write, it is possible to implement buffer bypass for aligned parts of the write so that
|
||||
/// we could avoid copying majority of the data into the internal buffer.
|
||||
pub async fn write_buffered_borrowed(
|
||||
&mut self,
|
||||
chunk: &[u8],
|
||||
@@ -373,68 +309,6 @@ mod tests {
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
|
||||
}
|
||||
|
||||
macro_rules! write {
|
||||
($writer:ident, $data:literal) => {{
|
||||
let mut buf = crate::virtual_file::IoBufferMut::with_capacity(2);
|
||||
buf.extend_from_slice($data);
|
||||
$writer
|
||||
.write_buffered(buf.freeze().slice_len(), &test_ctx())
|
||||
.await?;
|
||||
}};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_buffered_writes_only() -> std::io::Result<()> {
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx);
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"b");
|
||||
write!(writer, b"c");
|
||||
write!(writer, b"d");
|
||||
write!(writer, b"e");
|
||||
let (_, recorder) = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_writes_only() -> std::io::Result<()> {
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let ctx = test_ctx();
|
||||
let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx);
|
||||
write!(writer, b"abc");
|
||||
write!(writer, b"de");
|
||||
write!(writer, b"");
|
||||
write!(writer, b"fghijk");
|
||||
let (_, recorder) = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let ctx = test_ctx();
|
||||
let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx);
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"bc");
|
||||
write!(writer, b"d");
|
||||
write!(writer, b"e");
|
||||
let (_, recorder) = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
@@ -447,6 +321,7 @@ mod tests {
|
||||
);
|
||||
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"d", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"e", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"fg", ctx).await?;
|
||||
|
||||
Reference in New Issue
Block a user