diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 0f2e8a635c..eca5914945 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -122,72 +122,8 @@ where .expect("must not use after we returned an error") } - /// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted. - #[allow(dead_code)] - pub async fn write_buffered( - &mut self, - chunk: FullSlice, - ctx: &RequestContext, - ) -> std::io::Result<(usize, FullSlice)> { - let chunk = chunk.into_raw_slice(); - - let chunk_len = chunk.len(); - // avoid memcpy for the middle of the chunk - if chunk.len() >= self.mutable().cap() { - // TODO(yuchen): do we still want to keep the bypass path? - let control = self.flush(false, ctx).await?; - if let Some(control) = control { - control.release().await; - } - // do a big write, bypassing `buf` - assert_eq!( - self.mutable - .as_ref() - .expect("must not use after an error") - .pending(), - 0 - ); - let chunk = OwnedAsyncWriter::write_all_at( - self.writer.as_ref(), - FullSlice::must_new(chunk), - self.bytes_submitted, - ctx, - ) - .await?; - self.bytes_submitted += u64::try_from(chunk_len).unwrap(); - return Ok((chunk_len, chunk)); - } - // in-memory copy the < BUFFER_SIZED tail of the chunk - assert!(chunk.len() < self.mutable().cap()); - let mut slice = &chunk[..]; - let mut control: Option = None; - while !slice.is_empty() { - if let Some(control) = control.take() { - control.release().await; - } - let buf = self.mutable.as_mut().expect("must not use after an error"); - let need = buf.cap() - buf.pending(); - let have = slice.len(); - let n = std::cmp::min(need, have); - buf.extend_from_slice(&slice[..n]); - slice = &slice[n..]; - if buf.pending() >= buf.cap() { - assert_eq!(buf.pending(), buf.cap()); - control = self.flush(true, ctx).await?; - } - } - if let Some(control) = control.take() { - control.release().await; - } - assert!(slice.is_empty(), "by now we should have drained the chunk"); - Ok((chunk_len, FullSlice::must_new(chunk))) - } - - /// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data. - /// - /// It is less performant because we always have to copy the borrowed data into the internal buffer - /// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant - /// for large writes. + /// TODO(yuchen): For large write, it is possible to implement buffer bypass for aligned parts of the write so that + /// we could avoid copying majority of the data into the internal buffer. pub async fn write_buffered_borrowed( &mut self, chunk: &[u8], @@ -373,68 +309,6 @@ mod tests { RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) } - macro_rules! write { - ($writer:ident, $data:literal) => {{ - let mut buf = crate::virtual_file::IoBufferMut::with_capacity(2); - buf.extend_from_slice($data); - $writer - .write_buffered(buf.freeze().slice_len(), &test_ctx()) - .await?; - }}; - } - - #[tokio::test] - async fn test_buffered_writes_only() -> std::io::Result<()> { - let recorder = Arc::new(RecorderWriter::default()); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); - let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx); - write!(writer, b"a"); - write!(writer, b"b"); - write!(writer, b"c"); - write!(writer, b"d"); - write!(writer, b"e"); - let (_, recorder) = writer.flush_and_into_inner(&test_ctx()).await?; - assert_eq!( - recorder.get_writes(), - vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")] - ); - Ok(()) - } - - #[tokio::test] - async fn test_passthrough_writes_only() -> std::io::Result<()> { - let recorder = Arc::new(RecorderWriter::default()); - let ctx = test_ctx(); - let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx); - write!(writer, b"abc"); - write!(writer, b"de"); - write!(writer, b""); - write!(writer, b"fghijk"); - let (_, recorder) = writer.flush_and_into_inner(&test_ctx()).await?; - assert_eq!( - recorder.get_writes(), - vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")] - ); - Ok(()) - } - - #[tokio::test] - async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> { - let recorder = Arc::new(RecorderWriter::default()); - let ctx = test_ctx(); - let mut writer = BufferedWriter::new(recorder, || IoBufferMut::with_capacity(2), &ctx); - write!(writer, b"a"); - write!(writer, b"bc"); - write!(writer, b"d"); - write!(writer, b"e"); - let (_, recorder) = writer.flush_and_into_inner(&test_ctx()).await?; - assert_eq!( - recorder.get_writes(), - vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")] - ); - Ok(()) - } - #[tokio::test] async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> { let ctx = test_ctx(); @@ -447,6 +321,7 @@ mod tests { ); writer.write_buffered_borrowed(b"abc", ctx).await?; + writer.write_buffered_borrowed(b"", ctx).await?; writer.write_buffered_borrowed(b"d", ctx).await?; writer.write_buffered_borrowed(b"e", ctx).await?; writer.write_buffered_borrowed(b"fg", ctx).await?;